Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,4 @@ public class ExecutionPipeline
failureOf(t)
}
}

/**
* Wraps [t] in a [ResponseOutcome.Failure]. [InterruptedException] preserves the interrupt
* flag on the current thread per the SDK's cancellation contract.
*/
private fun failureOf(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,17 @@ public sealed class ResponseOutcome {
is Failure -> onFailure(error)
}
}

/**
* Wraps [t] in a [ResponseOutcome.Failure]. When [t] is an [InterruptedException] the interrupt
* flag is restored on the current thread before wrapping, honouring the SDK's cancellation
* contract so a thread blocked on the surfaced outcome still observes the cancellation. Shared by
* [ExecutionPipeline], [ResponsePipeline], and [org.dexpace.sdk.core.pipeline.step.retry.RetryStep]
* so the interrupt-aware wrapper has exactly one definition.
*/
internal fun failureOf(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class ResponsePipeline
ResponseOutcome.Success(step.execute(inResponse, context))
} catch (t: Throwable) {
closeQuietly(inResponse, t)
handleStepThrowable(t)
failureOf(t)
}
}
is ResponseOutcome.Failure -> return inbound
Expand All @@ -137,20 +137,9 @@ public class ResponsePipeline
if (outcome is ResponseOutcome.Success) {
closeQuietly(outcome.response, t)
}
handleStepThrowable(t)
failureOf(t)
}

/**
* Converts a step-raised throwable into a [ResponseOutcome.Failure]. [InterruptedException]
* preserves the interrupt flag on the current thread per the SDK's cancellation contract.
*/
private fun handleStepThrowable(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}

/**
* Closes [response], swallowing any close error so it never masks [primary]. A failure to
* close is attached to [primary] as a suppressed throwable for diagnostics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,10 @@ public object RetryAfterParser {
parseNumericSeconds(retryAfter)?.let { return it }
parseHttpDate(retryAfter, now)?.let { return it }
}
headers.get(HEADER_RETRY_AFTER_MS)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
}
headers.get(HEADER_X_MS_RETRY_AFTER_MS)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
for (header in arrayOf(HEADER_RETRY_AFTER_MS, HEADER_X_MS_RETRY_AFTER_MS)) {
headers.get(header)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
}
}
val rateLimitReset = headers.get(HEADER_X_RATELIMIT_RESET)?.trim()
if (!rateLimitReset.isNullOrEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,33 @@ public class RetrySettings
this.attemptHeaderName = settings.attemptHeaderName
}

/**
* Validates a [Duration] setting: it must be non-negative and small enough that the
* backoff math can convert it to nanoseconds without overflowing. Shared by the
* [totalTimeout], [initialDelay], and [maxDelay] setters; [name] names the offending
* field in the rejection message.
*/
private fun requireRepresentable(
name: String,
value: Duration,
) {
require(!value.isNegative) { "$name must be non-negative" }
require(value <= MAX_NANO_REPRESENTABLE_DELAY) {
"$name must be representable in nanoseconds (≤ ~292 years); got $value"
}
}

/** Sets [RetrySettings.totalTimeout]. Must be non-negative. */
public fun totalTimeout(totalTimeout: Duration): RetrySettingsBuilder =
apply {
require(!totalTimeout.isNegative) { "totalTimeout must be non-negative" }
require(totalTimeout <= MAX_NANO_REPRESENTABLE_DELAY) {
"totalTimeout must be representable in nanoseconds (≤ ~292 years); got $totalTimeout"
}
requireRepresentable("totalTimeout", totalTimeout)
this.totalTimeout = totalTimeout
}

/** Sets [RetrySettings.initialDelay]. Must be non-negative. */
public fun initialDelay(initialDelay: Duration): RetrySettingsBuilder =
apply {
require(!initialDelay.isNegative) { "initialDelay must be non-negative" }
require(initialDelay <= MAX_NANO_REPRESENTABLE_DELAY) {
"initialDelay must be representable in nanoseconds (≤ ~292 years); got $initialDelay"
}
requireRepresentable("initialDelay", initialDelay)
this.initialDelay = initialDelay
}

Expand All @@ -158,10 +168,7 @@ public class RetrySettings
/** Sets [RetrySettings.maxDelay]. Must be non-negative. */
public fun maxDelay(maxDelay: Duration): RetrySettingsBuilder =
apply {
require(!maxDelay.isNegative) { "maxDelay must be non-negative" }
require(maxDelay <= MAX_NANO_REPRESENTABLE_DELAY) {
"maxDelay must be representable in nanoseconds (≤ ~292 years); got $maxDelay"
}
requireRepresentable("maxDelay", maxDelay)
this.maxDelay = maxDelay
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.dexpace.sdk.core.http.response.exception.HttpException
import org.dexpace.sdk.core.http.response.exception.NetworkException
import org.dexpace.sdk.core.http.response.exception.Retryable
import org.dexpace.sdk.core.pipeline.ResponseOutcome
import org.dexpace.sdk.core.pipeline.failureOf
import org.dexpace.sdk.core.pipeline.step.ResponseRecoveryStep
import java.io.InterruptedIOException
import java.time.Clock
Expand Down Expand Up @@ -289,11 +290,8 @@ public class RetryStep
private fun executeOnce(attemptOrdinal: Int): ResponseOutcome =
try {
ResponseOutcome.Success(httpClient.execute(stampAttempt(request, attemptOrdinal)))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
ResponseOutcome.Failure(e)
} catch (t: Throwable) {
ResponseOutcome.Failure(t)
failureOf(t)
}

/**
Expand Down Expand Up @@ -362,10 +360,7 @@ public class RetryStep
* process-wide scheduler is a companion `by lazy` (SYNCHRONIZED), so it is initialised
* at most once across the whole VM — no per-instance guard is involved.
*/
private fun resolveScheduler(): ScheduledExecutorService {
settings.scheduler?.let { return it }
return DEFAULT_SCHEDULER
}
private fun resolveScheduler(): ScheduledExecutorService = settings.scheduler ?: DEFAULT_SCHEDULER

/**
* Returns true when [error] is an SDK-classified retryable condition. Classification
Expand Down
Loading