diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt index 4534e4e..ca6c1d2 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt @@ -101,23 +101,17 @@ public abstract class AsyncAuthStep : AsyncHttpStep { return Futures.failed(t) } - return challengeFuture.handle { retryRequest, hookError -> - HookOutcome(retryRequest, hookError) - }.thenCompose { outcome -> - val hookError = outcome.error + return challengeFuture.handle> { retryRequest, hookError -> if (hookError != null) { response.close() - return@thenCompose Futures.failed(Futures.unwrap(hookError)) + return@handle Futures.failed(Futures.unwrap(hookError)) } - val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response) + if (retryRequest == null) return@handle CompletableFuture.completedFuture(response) response.close() next.copy().processAsync(retryRequest) - } + }.thenCompose { it } } - /** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */ - private class HookOutcome(val request: Request?, val error: Throwable?) - /** * Returns a future of [request] with the credential's auth header attached. Subclasses * implement the concrete async stamping (e.g. fetch-or-refresh a bearer token off-thread, diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt index 369130a..8102279 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt @@ -7,7 +7,6 @@ package org.dexpace.sdk.core.http.pipeline.steps -import org.dexpace.sdk.core.http.auth.AuthChallengeParser import org.dexpace.sdk.core.http.auth.BearerToken import org.dexpace.sdk.core.http.auth.BearerTokenProvider import org.dexpace.sdk.core.http.common.HttpHeaderName @@ -255,15 +254,6 @@ public open class AsyncBearerTokenAuthStep return nonNull } - /** - * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` - * challenge. - */ - private fun offersBearerChallenge(response: Response): Boolean { - val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false - return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } - } - /** * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader]. * Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against @@ -287,6 +277,5 @@ public open class AsyncBearerTokenAuthStep private companion object { private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L - private const val BEARER_SCHEME = "bearer" } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt index ff1e8b8..2443fac 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt @@ -139,17 +139,6 @@ public open class BearerTokenAuthStep return authorizeRequest(request) } - /** - * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` - * challenge. A header with only non-bearer challenges (or one that does not parse) returns - * `false`. [AuthStep] guarantees the header is present before this hook runs; the explicit - * null-guard keeps the method correct if called from elsewhere. - */ - private fun offersBearerChallenge(response: Response): Boolean { - val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false - return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } - } - /** * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader]. * Guarded by the same [lock] as the refresh path so the read-compare-clear is atomic @@ -224,9 +213,21 @@ public open class BearerTokenAuthStep // Default refresh margin: refresh the bearer token 30 seconds before its expiry // so an in-flight request never carries a near-expired credential. private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L - - // Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower - // case, so the eviction gate compares against this constant. - private const val BEARER_SCHEME = "bearer" } } + +/** + * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` challenge. A + * header with only non-bearer challenges (or one that does not parse) returns `false`. The + * [AuthStep] / [AsyncAuthStep] pillar guarantees the header is present before the challenge hook + * runs; the explicit null-guard keeps the function correct if called from elsewhere. Shared by + * [BearerTokenAuthStep] and [AsyncBearerTokenAuthStep]. + */ +internal fun offersBearerChallenge(response: Response): Boolean { + val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false + return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } +} + +// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower case, so the +// eviction gate compares against this constant. +internal const val BEARER_SCHEME: String = "bearer" diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt index 99d0213..d83d7cb 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt @@ -109,7 +109,7 @@ public class DefaultAsyncInstrumentationStep // .handle do NOT get MDC by default. We capture the MDC snapshot INSIDE the // use{} block (after makeCurrentWithLoggingContext has pushed trace.id / span.id) // and restore it in each branch of the .handle callback. - var mdc: MdcSnapshot = MdcSnapshot.capture() // will be overwritten inside the scope + lateinit var mdc: MdcSnapshot val downstream: CompletableFuture = span.makeCurrentWithLoggingContext().use { // Capture after the scope has pushed trace.id / span.id so the snapshot carries them. diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt index c903d2e..c9c7f02 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt @@ -9,7 +9,6 @@ package org.dexpace.sdk.core.http.pipeline.steps import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext -import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger @@ -55,7 +54,7 @@ import kotlin.concurrent.withLock * ## Re-sendability gating * * Identical to [DefaultRetryStep]: - * - **No body** — retried only when the method is idempotent ([IDEMPOTENT_METHODS]); a bare + * - **No body** — retried only when the method is idempotent (see [RetryPolicySupport]); a bare * non-idempotent `POST` is not retried even though there is nothing to re-send. * - **Has a body** — retried only when [org.dexpace.sdk.core.http.request.RequestBody.isReplayable]. * @@ -103,30 +102,20 @@ public open class DefaultAsyncRetryStep private val clock: Clock = Clock.SYSTEM, internal val logger: ClientLogger = ClientLogger(DefaultAsyncRetryStep::class), ) : AsyncRetryStep() { - /** Effective options. `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */ - private val options: HttpRetryOptions = clampOptions(options) - /** - * The [options]' exponential parameters as a [RetrySettings] view so the shared - * [BackoffCalculator] computes this stack's schedule — built once, exactly as - * [DefaultRetryStep.backoffSettings]. `totalTimeout = ZERO` disables the deadline cap. - * Building it eagerly validates the delay magnitudes at construction. + * The stateless retry policy shared with [DefaultRetryStep]: the clamped [HttpRetryOptions] + * (`maxRetries < 0` → [DefaultRetryStep.DEFAULT_MAX_RETRIES]), the [RetrySettings] backoff + * view, and the re-sendability / predicate / delay helpers both stacks share. Built once; + * immutable after construction, so it is safe to share across this step's concurrent calls. */ - private val backoffSettings: RetrySettings = - RetrySettings.builder() - .initialDelay(this.options.baseDelay) - .maxDelay(this.options.maxDelay) - .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) - .jitter(RetrySettings.DEFAULT_JITTER) - .totalTimeout(Duration.ZERO) - .build() + private val support = RetryPolicySupport(options, logger) override fun processAsync( request: Request, next: AsyncPipelineNext, ): CompletableFuture { val result = CompletableFuture() - val driver = RetryDriver(next, isRetrySafe(request), result) + val driver = RetryDriver(next, support.isRetrySafe(request), result) driver.drive() return result } @@ -234,7 +223,7 @@ public open class DefaultAsyncRetryStep try { val retry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && shouldRetryResponse(response) if (!retry) { // Not retrying: hand the still-open response to the caller, who then @@ -288,14 +277,14 @@ public open class DefaultAsyncRetryStep // pre-classification interrupt carve-out. if (exception is InterruptedIOException || exception is InterruptedException) { Thread.currentThread().interrupt() - failTerminally(asInterruptedIo(exception)) + failTerminally(support.asInterruptedIo(exception)) return } val delay: Duration = try { val retry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && shouldRetryException(exception) if (!retry) { failTerminally(exception) @@ -371,12 +360,12 @@ public open class DefaultAsyncRetryStep private fun shouldRetryResponse(response: Response): Boolean { val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList())) - return invokeShouldRetry(options.shouldRetryCondition, condition) + return support.invokeShouldRetry(support.options.shouldRetryCondition, condition) } private fun shouldRetryException(exception: Exception): Boolean { val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList())) - return invokeShouldRetry(options.shouldRetryException, condition) + return support.invokeShouldRetry(support.options.shouldRetryException, condition) } // --------------- Logging --------------- @@ -405,53 +394,6 @@ public open class DefaultAsyncRetryStep } } - // --------------- Shared helpers (stateless across calls) --------------- - - private fun isRetrySafe(request: Request): Boolean { - val body = request.body ?: return request.method in IDEMPOTENT_METHODS - return body.isReplayable() - } - - /** - * Normalises an interrupt-signalling exception to [InterruptedIOException]: an - * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with - * the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name. - */ - private fun asInterruptedIo(exception: Exception): InterruptedIOException = - when (exception) { - is InterruptedIOException -> exception - else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } - } - - private fun invokeShouldRetry( - predicate: HttpRetryConditionPredicate, - condition: HttpRetryCondition, - ): Boolean = - try { - predicate.shouldRetry(condition) - } catch (t: Throwable) { - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - throw IllegalStateException("shouldRetry predicate threw", t) - } - - private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = - try { - options.delayFromCondition.delayFor(condition) - } catch (t: Throwable) { - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - logger.atWarning() - .event("http.retry.delay_override_failed") - .field("error.type", t::class.java.simpleName ?: "Throwable") - .cause(t) - .log() - null - } - - private fun backoffOrFixed(tryCount: Int): Duration = - options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) - // --------------- Delay computation (subclass extension points) --------------- /** @@ -467,9 +409,9 @@ public open class DefaultAsyncRetryStep * by the loop's close-on-throw guard). */ protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } + support.invokeDelayFromCondition(condition)?.let { return it } condition.response?.let { retryAfterFromHeaders(it) }?.let { return it } - return backoffOrFixed(condition.tryCount) + return support.backoffOrFixed(condition.tryCount) } /** @@ -478,8 +420,8 @@ public open class DefaultAsyncRetryStep * with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay]. */ protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } - return backoffOrFixed(condition.tryCount) + support.invokeDelayFromCondition(condition)?.let { return it } + return support.backoffOrFixed(condition.tryCount) } /** @@ -490,7 +432,7 @@ public open class DefaultAsyncRetryStep */ protected open fun retryAfterFromHeaders(response: Response): Duration? { val now = clock.now() - for (name in options.retryAfterHeaders) { + for (name in support.options.retryAfterHeaders) { val raw = response.headers.get(name) ?: continue RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it } } @@ -514,32 +456,8 @@ public open class DefaultAsyncRetryStep } } - private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { - if (opts.maxRetries >= 0) return opts - logger.atVerbose() - .event("http.retry.maxRetries_clamped") - .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) - .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong()) - .log() - return HttpRetryOptions( - maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES, - baseDelay = opts.baseDelay, - maxDelay = opts.maxDelay, - fixedDelay = opts.fixedDelay, - retryAfterHeaders = opts.retryAfterHeaders, - shouldRetryCondition = opts.shouldRetryCondition, - shouldRetryException = opts.shouldRetryException, - delayFromCondition = opts.delayFromCondition, - ) - } - public companion object { // Nanoseconds in one millisecond — converts monotonic deltas to ms for log events. private const val NANOS_PER_MILLI = 1_000_000L - - // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). - // Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS. - private val IDEMPOTENT_METHODS: Set = - setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt index 8df0f34..e62e2ad 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt @@ -9,7 +9,6 @@ package org.dexpace.sdk.core.http.pipeline.steps import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.PipelineNext -import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger @@ -47,7 +46,7 @@ import java.time.Duration * ## Body replayability * * Eligibility is gated on re-sendability, keyed off whether the request carries a body: - * - **No body** — retried only when the method is idempotent ([IDEMPOTENT_METHODS]). Body-less + * - **No body** — retried only when the method is idempotent (see [RetryPolicySupport]). Body-less * retry safety keys off method idempotency, NOT off the absence of a body, so a body-less * non-idempotent request — e.g. a bare `POST` to a trigger / activate-style endpoint — is NOT * retried even though there is no payload to re-send: replaying it could duplicate the side @@ -145,37 +144,15 @@ public open class DefaultRetryStep internal val logger: ClientLogger = ClientLogger(DefaultRetryStep::class), ) : RetryStep() { /** - * Effective options. `maxRetries < 0` is clamped to [DEFAULT_MAX_RETRIES] at - * construction; a verbose log records the clamp so callers can spot config bugs. + * The stateless retry policy shared with [DefaultAsyncRetryStep]: the clamped + * [HttpRetryOptions] (`maxRetries < 0` → [DEFAULT_MAX_RETRIES], logged at construction), the + * [RetrySettings] backoff view, and the re-sendability / predicate / delay helpers both + * stacks share. Built once; immutable after construction. Building the backoff view also + * validates the delay magnitudes eagerly — a pathological `baseDelay`/`maxDelay` surfaces as + * an [IllegalArgumentException] here, at step construction, rather than at delay-computation + * time. */ - private val options: HttpRetryOptions = clampOptions(options) - - /** - * The [options]' exponential parameters expressed as a [RetrySettings] view so the shared - * [BackoffCalculator] can compute this stack's schedule. Built once per step instance: - * - `initialDelay` / `maxDelay` come from the options. - * - `delayMultiplier` (2.0) and `jitter` (0.2) are the canonical shared constants — the - * options object does not expose its own multiplier/jitter, so the SDK defaults apply. - * If [HttpRetryOptions] ever gains configurable multiplier/jitter knobs, this view must - * read them from the options instead of the constants, or the new knobs are silently - * ignored on this stack. - * - `totalTimeout = ZERO` disables the deadline cap: the stage-based step has no budget. - * The `fixedDelay` path never consults this view; it short-circuits in [backoffOrFixed]. - * - * Building this view also validates the delay magnitudes eagerly: [RetrySettings.builder] - * rejects a negative `baseDelay`/`maxDelay` and one larger than the calculator's - * ~292-year nanosecond ceiling. [HttpRetryOptions] performs no such range check, so a - * pathological delay surfaces as an [IllegalArgumentException] here, at step construction, - * rather than later at delay-computation time. - */ - private val backoffSettings: RetrySettings = - RetrySettings.builder() - .initialDelay(this.options.baseDelay) - .maxDelay(this.options.maxDelay) - .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) - .jitter(RetrySettings.DEFAULT_JITTER) - .totalTimeout(Duration.ZERO) - .build() + private val support = RetryPolicySupport(options, logger) /** * Sends [request] through the downstream pipeline with automatic retry on retryable failures. @@ -209,7 +186,7 @@ public open class DefaultRetryStep // safely re-sent: the second writeTo would trip the body's consume-once guard. When // that holds, the loop runs exactly one attempt and never retries — mirroring the // pipeline-primitives RetryStep.canRetry() invariant. - val retrySafe = isRetrySafe(request) + val retrySafe = support.isRetrySafe(request) // The retry loop has distinct continue / return paths per attempt outcome // (success-retryable / success-final / exception-retryable / exception-final). @@ -222,7 +199,7 @@ public open class DefaultRetryStep val response = attemptResult.getOrThrow() val shouldRetry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && decideRetryResponse(response, tryCount, suppressed, retrySequenceStartNanos) if (shouldRetry) { tryCount++ @@ -246,11 +223,11 @@ public open class DefaultRetryStep // the @Throws(IOException) contract holds, with prior failures attached. if (exception is InterruptedIOException || exception is InterruptedException) { Thread.currentThread().interrupt() - val ioe = asInterruptedIo(exception) + val ioe = support.asInterruptedIo(exception) suppressed?.forEach(ioe::addSuppressed) throw ioe } - if (retrySafe && tryCount < options.maxRetries) { + if (retrySafe && tryCount < support.options.maxRetries) { val accumulator = suppressed ?: ArrayList().also { suppressed = it } if (decideRetryException(exception, tryCount, accumulator, retrySequenceStartNanos)) { tryCount++ @@ -280,34 +257,6 @@ public open class DefaultRetryStep } } - /** - * Returns `true` when [request] may be re-sent. A body-less request is retry-safe only - * when its method is idempotent ([IDEMPOTENT_METHODS]) — the gate keys off method - * idempotency, not off the absence of a body, so a body-less non-idempotent request (a - * bare `POST`) is NOT retry-safe even though there is no payload to re-send. A body-bearing - * request is retry-safe only when its body is replayable — a non-replayable body cannot be - * re-sent (the second `RequestBody.writeTo` trips the body's consume-once guard and - * surfaces as a confusing wrapped [IllegalStateException]). Making a re-sent body-bearing - * request idempotent is the caller's responsibility. Mirrors - * `pipeline.step.retry.RetryStep.canRetry`. - */ - private fun isRetrySafe(request: Request): Boolean { - val body = request.body ?: return request.method in IDEMPOTENT_METHODS - return body.isReplayable() - } - - /** - * Normalises an interrupt-signalling exception to [InterruptedIOException]. An - * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped - * (with the original attached as cause) so the loop can satisfy its `@Throws(IOException)` - * contract while preserving the cancellation signal. - */ - private fun asInterruptedIo(exception: Exception): InterruptedIOException = - when (exception) { - is InterruptedIOException -> exception - else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } - } - /** * Executes a single pipeline attempt and returns the result (success or exception) * wrapped in a [Result]. Errors ([Error] subclasses) are NOT caught here — they @@ -343,7 +292,7 @@ public open class DefaultRetryStep // closing on the throw path is safe even though the happy path closes again. val delay: Duration = try { - if (!invokeShouldRetryResponse(condition)) return false + if (!support.invokeShouldRetry(support.options.shouldRetryCondition, condition)) return false computeResponseDelay(condition) } catch (t: Throwable) { closeQuietly(response) @@ -370,7 +319,7 @@ public open class DefaultRetryStep retrySequenceStartNanos: Long, ): Boolean { val condition = HttpRetryCondition(null, exception, tryCount, accumulator) - if (!invokeShouldRetryException(condition)) return false + if (!support.invokeShouldRetry(support.options.shouldRetryException, condition)) return false val delay = computeExceptionDelay(condition) logRetry(tryCount, delay, statusCode = -1, cause = exception, retrySequenceStartNanos) // Append the current exception BEFORE sleeping. If the sleep is @@ -398,33 +347,6 @@ public open class DefaultRetryStep } } - // --------------- Should-retry hooks --------------- - - /** - * Invokes [HttpRetryOptions.shouldRetryCondition] and wraps any thrown exception as - * [IllegalStateException] — predicates must not destabilise the retry loop. - */ - private fun invokeShouldRetryResponse(condition: HttpRetryCondition): Boolean = - invokeShouldRetry(options.shouldRetryCondition, condition) - - private fun invokeShouldRetryException(condition: HttpRetryCondition): Boolean = - invokeShouldRetry(options.shouldRetryException, condition) - - private fun invokeShouldRetry( - predicate: HttpRetryConditionPredicate, - condition: HttpRetryCondition, - ): Boolean = - try { - predicate.shouldRetry(condition) - } catch (t: Throwable) { - // Error subclasses still rethrown; an OOM in the predicate must not be wrapped. - // Splitting Error from RuntimeException via `is` is the canonical JVM idiom for - // retry classification — there is no other way to distinguish JVM Errors here. - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - throw IllegalStateException("shouldRetry predicate threw", t) - } - // --------------- Delay computation --------------- /** @@ -434,9 +356,9 @@ public open class DefaultRetryStep * 3. [HttpRetryOptions.fixedDelay] or exponential backoff. */ protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } + support.invokeDelayFromCondition(condition)?.let { return it } condition.response?.let { retryAfterFromHeaders(it) }?.let { return it } - return backoffOrFixed(condition.tryCount) + return support.backoffOrFixed(condition.tryCount) } /** @@ -444,41 +366,10 @@ public open class DefaultRetryStep * consult, so the resolution order skips step (2) of [computeResponseDelay]. */ protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } - return backoffOrFixed(condition.tryCount) + support.invokeDelayFromCondition(condition)?.let { return it } + return support.backoffOrFixed(condition.tryCount) } - private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = - try { - options.delayFromCondition.delayFor(condition) - } catch (t: Throwable) { - // `is Error` is the canonical JVM idiom for splitting catastrophic Errors from - // user-recoverable Exceptions; detekt's preference for class hierarchy doesn't - // apply when classifying for retry/rethrow. - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - // Don't fail the whole pipeline if the user override misbehaves — fall back to - // the default delay calculation. Log loud enough that the bug is observable. - logger.atWarning() - .event("http.retry.delay_override_failed") - .field("error.type", t::class.java.simpleName ?: "Throwable") - .cause(t) - .log() - null - } - - /** - * Returns [HttpRetryOptions.fixedDelay] if set, otherwise the exponential-backoff delay - * for [tryCount]. The backoff is computed by the shared [BackoffCalculator] from - * [backoffSettings] so this stack and the recovery-aware `RetryStep` share one formula. - * - * [tryCount] is 0-indexed here (`0` = the delay before the first retry), whereas - * [BackoffCalculator.computeDelay] is 1-indexed (`1` = first retry); the `+ 1` bridges - * the two so both produce `baseDelay`, `2·baseDelay`, `4·baseDelay`, … capped at `maxDelay`. - */ - private fun backoffOrFixed(tryCount: Int): Duration = - options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) - // --------------- Retry-After parsing --------------- /** @@ -495,7 +386,7 @@ public open class DefaultRetryStep */ protected open fun retryAfterFromHeaders(response: Response): Duration? { val now = clock.now() - for (name in options.retryAfterHeaders) { + for (name in support.options.retryAfterHeaders) { val raw = response.headers.get(name) ?: continue RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it } } @@ -552,29 +443,6 @@ public open class DefaultRetryStep event.log() } - // --------------- Options clamping --------------- - - private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { - if (opts.maxRetries >= 0) return opts - logger.atVerbose() - .event("http.retry.maxRetries_clamped") - .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) - .field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong()) - .log() - // HttpRetryOptions isn't a data class, so an explicit copy via the constructor is - // the cheapest correct fix — re-using every other field as-is. - return HttpRetryOptions( - maxRetries = DEFAULT_MAX_RETRIES, - baseDelay = opts.baseDelay, - maxDelay = opts.maxDelay, - fixedDelay = opts.fixedDelay, - retryAfterHeaders = opts.retryAfterHeaders, - shouldRetryCondition = opts.shouldRetryCondition, - shouldRetryException = opts.shouldRetryException, - delayFromCondition = opts.delayFromCondition, - ) - } - public companion object { /** * Default retry count applied when the caller passes a negative @@ -589,11 +457,5 @@ public open class DefaultRetryStep // Nanoseconds in one millisecond — used to convert monotonic-clock deltas to ms // for retry log events. private const val NANOS_PER_MILLI = 1_000_000L - - // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). - // A request using one of these may always be retried; others require a replayable - // body. Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS. - private val IDEMPOTENT_METHODS: Set = - setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt index 8248ac2..eaabe01 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt @@ -80,6 +80,23 @@ public class HttpRetryOptions HttpRetryConditionPredicate(::defaultShouldRetryException), public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null }, ) { + /** + * Returns a copy of these options with [maxRetries] replaced and every other field + * preserved. Used by the retry steps to apply the negative-`maxRetries` clamp without + * hand-rebuilding all eight fields at each call site. + */ + internal fun withMaxRetries(maxRetries: Int): HttpRetryOptions = + HttpRetryOptions( + maxRetries = maxRetries, + baseDelay = baseDelay, + maxDelay = maxDelay, + fixedDelay = fixedDelay, + retryAfterHeaders = retryAfterHeaders, + shouldRetryCondition = shouldRetryCondition, + shouldRetryException = shouldRetryException, + delayFromCondition = delayFromCondition, + ) + public companion object { // The default retry count is the canonical SDK budget, kept in one place on // DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS). diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt new file mode 100644 index 0000000..dd1e2b1 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.pipeline.step.retry.BackoffCalculator +import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings +import java.io.InterruptedIOException +import java.time.Duration + +/** + * Stateless retry policy shared by [DefaultRetryStep] and [DefaultAsyncRetryStep]. Clamps the + * caller's [HttpRetryOptions] (a negative `maxRetries` becomes + * [DefaultRetryStep.DEFAULT_MAX_RETRIES]) and builds the [RetrySettings] backoff view once, then + * exposes the policy helpers both drivers share: re-sendability gating, interrupt normalisation, + * predicate invocation, the caller delay override, and fixed-or-exponential backoff. + * + * Per-call state (try count, suppressed trail, terminal completion), the `protected open` delay + * hooks, and the close-on-discard helper stay on each step — those differ between the blocking and + * async stacks. Holds no mutable state after construction, so it is safe to share across + * concurrent calls. + */ +internal class RetryPolicySupport( + rawOptions: HttpRetryOptions, + private val logger: ClientLogger, +) { + /** Effective options; `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */ + val options: HttpRetryOptions = clampOptions(rawOptions) + + /** + * The [options]' exponential parameters as a [RetrySettings] view so the shared + * [BackoffCalculator] computes this stack's schedule. Built once; `totalTimeout = ZERO` + * disables the deadline cap. Building it eagerly validates the delay magnitudes. + */ + val backoffSettings: RetrySettings = + RetrySettings.builder() + .initialDelay(options.baseDelay) + .maxDelay(options.maxDelay) + .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) + .jitter(RetrySettings.DEFAULT_JITTER) + .totalTimeout(Duration.ZERO) + .build() + + /** + * Returns `true` when [request] may be re-sent: a body-less request only when its method is + * idempotent ([IDEMPOTENT_METHODS]); a body-bearing request only when its body is replayable. + */ + fun isRetrySafe(request: Request): Boolean { + val body = request.body ?: return request.method in IDEMPOTENT_METHODS + return body.isReplayable() + } + + /** + * Normalises an interrupt-signalling exception to [InterruptedIOException]: an + * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with the + * original attached as its cause. + */ + fun asInterruptedIo(exception: Exception): InterruptedIOException = + when (exception) { + is InterruptedIOException -> exception + else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } + } + + fun invokeShouldRetry( + predicate: HttpRetryConditionPredicate, + condition: HttpRetryCondition, + ): Boolean = + try { + predicate.shouldRetry(condition) + } catch (t: Throwable) { + // Error subclasses still rethrown; an OOM in the predicate must not be wrapped. + // Splitting Error from RuntimeException via `is` is the canonical JVM idiom for + // retry classification — there is no other way to distinguish JVM Errors here. + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + throw IllegalStateException("shouldRetry predicate threw", t) + } + + fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = + try { + options.delayFromCondition.delayFor(condition) + } catch (t: Throwable) { + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + // Don't fail the whole pipeline if the user override misbehaves — fall back to the + // default delay calculation. Log loud enough that the bug is observable. + logger.atWarning() + .event("http.retry.delay_override_failed") + .field("error.type", t::class.java.simpleName ?: "Throwable") + .cause(t) + .log() + null + } + + fun backoffOrFixed(tryCount: Int): Duration = + options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) + + private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { + if (opts.maxRetries >= 0) return opts + logger.atVerbose() + .event("http.retry.maxRetries_clamped") + .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) + .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong()) + .log() + return opts.withMaxRetries(DefaultRetryStep.DEFAULT_MAX_RETRIES) + } + + private companion object { + // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). + // Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS. + private val IDEMPOTENT_METHODS: Set = + setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) + } +}