While tidying the sync/async step pairs under http/pipeline/steps I found several more helper
blocks that have drifted into byte-for-byte duplicates between siblings, plus a couple of small
local cleanups in the same files. These are independent, behavior-preserving, and internal — no
public signature changes, so apiCheck is unaffected.
The largest item in this area — extracting a shared instrumentation emitter from the two
instrumentation steps — is tracked separately in #26; this issue covers the rest.
The items below can land as separate commits. The RetryPolicySupport extraction is the only
structural one and deserves a careful review.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt:112-136 — drop the throwaway MDC capture
processAsync initialises mdc with an MdcSnapshot.capture() that is overwritten on
the very first line inside the use { } block, so every async request pays for one MDC
copy that is immediately discarded. A lateinit var expresses the intent (assigned inside
the scope, read after) without the wasted capture.
Old code:
var mdc: MdcSnapshot = MdcSnapshot.capture() // will be overwritten inside the scope
val downstream: CompletableFuture<Response> =
span.makeCurrentWithLoggingContext().use {
// Capture after the scope has pushed trace.id / span.id so the snapshot carries them.
mdc = MdcSnapshot.capture()
emitRequestEvent(outgoing, redactedUrl)
try {
next.processAsync(outgoing)
} catch (e: Throwable) {
// Synchronous throw from the next step (e.g. argument validation).
// Normalise to a failed future so callers get the uniform async contract.
val elapsedMs = elapsedMillis(startNanos)
emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody)
recordMetrics(
request,
statusCode = -1,
elapsedMs,
errorType = e::class.java.simpleName ?: "Throwable",
)
span.end(e)
return Futures.failed(e)
}
}
val capturedMdc = mdc
New code:
lateinit var mdc: MdcSnapshot
val downstream: CompletableFuture<Response> =
span.makeCurrentWithLoggingContext().use {
// Capture after the scope has pushed trace.id / span.id so the snapshot carries them.
mdc = MdcSnapshot.capture()
emitRequestEvent(outgoing, redactedUrl)
try {
next.processAsync(outgoing)
} catch (e: Throwable) {
// Synchronous throw from the next step (e.g. argument validation).
// Normalise to a failed future so callers get the uniform async contract.
val elapsedMs = elapsedMillis(startNanos)
emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody)
recordMetrics(
request,
statusCode = -1,
elapsedMs,
errorType = e::class.java.simpleName ?: "Throwable",
)
span.end(e)
return Futures.failed(e)
}
}
val capturedMdc = mdc
Usage — before → after: call sites unchanged — behavior-preserving. mdc is assigned
on the first line of the use { } block (before any early return reads it), so the read
below is always initialised:
val capturedMdc = mdc
return downstream.handle { raw, err -> handleCompletion(span, capturedMdc, raw, err, /* ... */) }
val capturedMdc = mdc
return downstream.handle { raw, err -> handleCompletion(span, capturedMdc, raw, err, /* ... */) }
Why: avoids one wasted MdcSnapshot copy/allocation per async request; lateinit
also documents that the real capture happens inside the trace-context scope (so the
snapshot carries trace.id / span.id), which the throwaway initializer obscured.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt:408-544 — extract RetryPolicySupport for the shared stateless policy
DefaultAsyncRetryStep and DefaultRetryStep share the entire retry policy: options
clamping, the RetrySettings backoff view, re-sendability gating, interrupt
normalisation, predicate invocation, the caller delay override, and fixed/exponential
backoff. Only the per-call loop machinery and the terminal-failure paths genuinely differ.
Pull the stateless policy into one internal class RetryPolicySupport(rawOptions, clock, logger) that clamps options and builds backoffSettings once and exposes the shared
helpers; both steps hold a support and delegate.
Deliberately kept on each step (they differ): the per-call shouldRetryResponse /
shouldRetryException (they read tryCount / suppressed), the distinct terminal-completion
paths, the protected open delay hooks (computeResponseDelay / computeExceptionDelay /
retryAfterFromHeaders — part of the subclassing contract), and closeQuietly with its
deliberately different catch width (sync swallows IOException, async swallows Exception).
This is the structural item — review carefully.
Old code: (DefaultAsyncRetryStep.kt lines 408-544; DefaultRetryStep.kt carries the
same helpers at lines 294-597 with only the closeQuietly catch width differing)
// --------------- Shared helpers (stateless across calls) ---------------
private fun isRetrySafe(request: Request): Boolean {
val body = request.body ?: return request.method in IDEMPOTENT_METHODS
return body.isReplayable()
}
/**
* Normalises an interrupt-signalling exception to [InterruptedIOException]: an
* [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with
* the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name.
*/
private fun asInterruptedIo(exception: Exception): InterruptedIOException =
when (exception) {
is InterruptedIOException -> exception
else -> InterruptedIOException("retry interrupted").apply { initCause(exception) }
}
private fun invokeShouldRetry(
predicate: HttpRetryConditionPredicate,
condition: HttpRetryCondition,
): Boolean =
try {
predicate.shouldRetry(condition)
} catch (t: Throwable) {
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
throw IllegalStateException("shouldRetry predicate threw", t)
}
private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? =
try {
options.delayFromCondition.delayFor(condition)
} catch (t: Throwable) {
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
logger.atWarning()
.event("http.retry.delay_override_failed")
.field("error.type", t::class.java.simpleName ?: "Throwable")
.cause(t)
.log()
null
}
private fun backoffOrFixed(tryCount: Int): Duration =
options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings)
// --------------- Delay computation (subclass extension points) ---------------
/**
* Computes the delay before retrying [condition]'s response. Resolution order mirrors
* [DefaultRetryStep.computeResponseDelay]:
* 1. [HttpRetryOptions.delayFromCondition] override (if it returns non-null).
* 2. `Retry-After` header parsing ([retryAfterFromHeaders]).
* 3. [HttpRetryOptions.fixedDelay] or exponential backoff.
*
* `protected open` so a subclass can apply request-specific delay logic, exactly as the
* synchronous [DefaultRetryStep] allows. An override MUST return a non-negative [Duration]
* and MUST NOT throw: a throw aborts the call (the open retryable response is closed first
* by the loop's close-on-throw guard).
*/
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
invokeDelayFromCondition(condition)?.let { return it }
condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
return backoffOrFixed(condition.tryCount)
}
/**
* Computes the delay before retrying [condition]'s exception. Like [computeResponseDelay]
* but skips header parsing (there is no response to read headers from). `protected open`
* with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay].
*/
protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration {
invokeDelayFromCondition(condition)?.let { return it }
return backoffOrFixed(condition.tryCount)
}
/**
* Walks [HttpRetryOptions.retryAfterHeaders] in order, returning the first parseable delay.
* `protected open` so a subclass can support additional server-specific pacing headers,
* mirroring [DefaultRetryStep.retryAfterFromHeaders]. May return `null` to fall through to
* the default backoff; must not throw.
*/
protected open fun retryAfterFromHeaders(response: Response): Duration? {
val now = clock.now()
for (name in options.retryAfterHeaders) {
val raw = response.headers.get(name) ?: continue
RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it }
}
return null
}
private fun closeQuietly(response: Response) {
try {
response.close()
} catch (closeErr: Exception) {
// Swallow ANY close failure (not just IOException) on a response being discarded
// before a retry: it is not actionable, and on the async path an escaping throw
// would be swallowed by the whenComplete callback and strand the returned future.
// Only [Error] (OOM, StackOverflow) propagates — those are JVM-fatal, not ours to
// recover. (The sync DefaultRetryStep lets a non-IOException close failure surface
// as a terminal error; async cannot, since that path has no caller to throw to.)
logger.atVerbose()
.event("http.retry.close_failed")
.field("error.type", closeErr::class.java.simpleName ?: "Exception")
.log()
}
}
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
if (opts.maxRetries >= 0) return opts
logger.atVerbose()
.event("http.retry.maxRetries_clamped")
.field("http.retry.max_retries.requested", opts.maxRetries.toLong())
.field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong())
.log()
return HttpRetryOptions(
maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES,
baseDelay = opts.baseDelay,
maxDelay = opts.maxDelay,
fixedDelay = opts.fixedDelay,
retryAfterHeaders = opts.retryAfterHeaders,
shouldRetryCondition = opts.shouldRetryCondition,
shouldRetryException = opts.shouldRetryException,
delayFromCondition = opts.delayFromCondition,
)
}
public companion object {
// Nanoseconds in one millisecond — converts monotonic deltas to ms for log events.
private const val NANOS_PER_MILLI = 1_000_000L
// Methods safe to re-send regardless of body replayability (idempotent per RFC 9110).
// Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS.
private val IDEMPOTENT_METHODS: Set<Method> =
setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE)
}
New code: the shared stateless policy in one file. The protected open delay hooks and
closeQuietly stay on each step and delegate into support (shown under Usage). NANOS_PER_MILLI
stays in each step's companion (used by the per-call logRetry).
/*
* Copyright (c) 2026 dexpace and Omar Aljarrah
*
* Licensed under the MIT License. See LICENSE in the project root.
* SPDX-License-Identifier: MIT
*/
package org.dexpace.sdk.core.http.pipeline.steps
import org.dexpace.sdk.core.http.request.Method
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.instrumentation.ClientLogger
import org.dexpace.sdk.core.pipeline.step.retry.BackoffCalculator
import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings
import org.dexpace.sdk.core.util.Clock
import java.io.InterruptedIOException
import java.time.Duration
/**
* Stateless retry policy shared by [DefaultRetryStep] and [DefaultAsyncRetryStep]. Clamps the
* caller's [HttpRetryOptions] (a negative `maxRetries` becomes
* [DefaultRetryStep.DEFAULT_MAX_RETRIES]) and builds the [RetrySettings] backoff view once, then
* exposes the policy helpers both drivers share: re-sendability gating, interrupt normalisation,
* predicate invocation, the caller delay override, and fixed-or-exponential backoff.
*
* Per-call state (try count, suppressed trail, terminal completion), the `protected open` delay
* hooks, and the close-on-discard helper stay on each step — those differ between the blocking and
* async stacks. Holds no mutable state after construction, so it is safe to share across
* concurrent calls.
*/
internal class RetryPolicySupport(
rawOptions: HttpRetryOptions,
private val clock: Clock,
private val logger: ClientLogger,
) {
/** Effective options; `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */
val options: HttpRetryOptions = clampOptions(rawOptions)
/**
* The [options]' exponential parameters as a [RetrySettings] view so the shared
* [BackoffCalculator] computes this stack's schedule. Built once; `totalTimeout = ZERO`
* disables the deadline cap. Building it eagerly validates the delay magnitudes.
*/
val backoffSettings: RetrySettings =
RetrySettings.builder()
.initialDelay(options.baseDelay)
.maxDelay(options.maxDelay)
.delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER)
.jitter(RetrySettings.DEFAULT_JITTER)
.totalTimeout(Duration.ZERO)
.build()
/**
* Returns `true` when [request] may be re-sent: a body-less request only when its method is
* idempotent ([IDEMPOTENT_METHODS]); a body-bearing request only when its body is replayable.
*/
fun isRetrySafe(request: Request): Boolean {
val body = request.body ?: return request.method in IDEMPOTENT_METHODS
return body.isReplayable()
}
/**
* Normalises an interrupt-signalling exception to [InterruptedIOException]: an
* [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with the
* original attached as its cause.
*/
fun asInterruptedIo(exception: Exception): InterruptedIOException =
when (exception) {
is InterruptedIOException -> exception
else -> InterruptedIOException("retry interrupted").apply { initCause(exception) }
}
fun invokeShouldRetry(
predicate: HttpRetryConditionPredicate,
condition: HttpRetryCondition,
): Boolean =
try {
predicate.shouldRetry(condition)
} catch (t: Throwable) {
// Error subclasses still rethrown; an OOM in the predicate must not be wrapped.
// Splitting Error from RuntimeException via `is` is the canonical JVM idiom for
// retry classification — there is no other way to distinguish JVM Errors here.
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
throw IllegalStateException("shouldRetry predicate threw", t)
}
fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? =
try {
options.delayFromCondition.delayFor(condition)
} catch (t: Throwable) {
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
// Don't fail the whole pipeline if the user override misbehaves — fall back to the
// default delay calculation. Log loud enough that the bug is observable.
logger.atWarning()
.event("http.retry.delay_override_failed")
.field("error.type", t::class.java.simpleName ?: "Throwable")
.cause(t)
.log()
null
}
fun backoffOrFixed(tryCount: Int): Duration =
options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings)
// The eight-field rebuild below is itself collapsed to `opts.withMaxRetries(...)` by the
// HttpRetryOptions cleanup further down this issue; shown here as-is for a faithful relocation.
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
if (opts.maxRetries >= 0) return opts
logger.atVerbose()
.event("http.retry.maxRetries_clamped")
.field("http.retry.max_retries.requested", opts.maxRetries.toLong())
.field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong())
.log()
return HttpRetryOptions(
maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES,
baseDelay = opts.baseDelay,
maxDelay = opts.maxDelay,
fixedDelay = opts.fixedDelay,
retryAfterHeaders = opts.retryAfterHeaders,
shouldRetryCondition = opts.shouldRetryCondition,
shouldRetryException = opts.shouldRetryException,
delayFromCondition = opts.delayFromCondition,
)
}
private companion object {
// Methods safe to re-send regardless of body replayability (idempotent per RFC 9110).
// Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS.
private val IDEMPOTENT_METHODS: Set<Method> =
setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE)
}
}
Usage — before → after: the step holds one support instead of options +
backoffSettings + the five private helpers, and the protected open hooks delegate:
// DefaultAsyncRetryStep — before
private val options: HttpRetryOptions = clampOptions(options)
private val backoffSettings: RetrySettings = RetrySettings.builder()/* ... */.build()
// ...
val driver = RetryDriver(next, isRetrySafe(request), result)
// ... inside the driver / hooks:
val retry = retrySafe && tryCount < options.maxRetries && shouldRetryResponse(response)
return invokeShouldRetry(options.shouldRetryCondition, condition)
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
invokeDelayFromCondition(condition)?.let { return it }
condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
return backoffOrFixed(condition.tryCount)
}
// DefaultAsyncRetryStep — after
private val support = RetryPolicySupport(options, clock, logger)
// ...
val driver = RetryDriver(next, support.isRetrySafe(request), result)
// ... inside the driver / hooks:
val retry = retrySafe && tryCount < support.options.maxRetries && shouldRetryResponse(response)
return support.invokeShouldRetry(support.options.shouldRetryCondition, condition)
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
support.invokeDelayFromCondition(condition)?.let { return it }
condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
return support.backoffOrFixed(condition.tryCount)
}
// retryAfterFromHeaders / closeQuietly stay on the step; closeQuietly keeps its async-wide
// `catch (closeErr: Exception)` (sync keeps `catch (closeErr: IOException)`).
Why: the retry decision/backoff logic lives in one place, so a change to clamping,
backoff settings, or predicate-exception handling can't silently apply to only one of the
two stacks; each step shrinks to its genuinely-distinct loop and terminal paths.
Build: new file needs the MIT header. BackoffCalculator, RetrySettings, and
Method imports move from both steps into RetryPolicySupport (drop them from the steps or
allWarningsAsErrors fails); each step keeps InterruptedIOException (the driver still
type-checks is InterruptedIOException) and RetryAfterParser (used by the retained
retryAfterFromHeaders). Internal only — DefaultRetryStep.DEFAULT_MAX_RETRIES stays
public, so no apiCheck impact.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt:258-291 — hoist offersBearerChallenge to one top-level function
Both bearer steps define a private offersBearerChallenge(response) and a private
"bearer" scheme constant with identical bodies. Hoist them to a single top-level
internal fun + internal const in BearerTokenAuthStep.kt (mirroring how
defaultShouldRetryResponse already lives top-level in HttpRetryOptions.kt) and call it
from both. evictRejectedToken stays duplicated — it touches each step's own lock /
cachedToken and isn't shareable as-is.
Old code: (AsyncBearerTokenAuthStep.kt lines 258-291, plus the sync duplicates it
mirrors)
// ── AsyncBearerTokenAuthStep.kt:258-291 ──
/**
* Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer`
* challenge.
*/
private fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}
/**
* Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
* Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against
* a concurrent refresh.
*/
private fun evictRejectedToken(rejectedHeader: String) {
lock.withLock {
val current = cachedToken ?: return
if (bearerHeaderValue(current.token) == rejectedHeader) {
cachedToken = null
}
}
}
/**
* The `Authorization` header value for [token]. Single source of truth shared by the
* stamping path and [evictRejectedToken]. A subclass that emits a different header format
* must override this too, or eviction stops matching.
*/
protected open fun bearerHeaderValue(token: String): String = "Bearer $token"
private companion object {
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
private const val BEARER_SCHEME = "bearer"
}
}
// ── BearerTokenAuthStep.kt:148-151 + 223-231 — the sync duplicates ──
private fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}
// ...
private companion object {
// Default refresh margin: refresh the bearer token 30 seconds before its expiry
// so an in-flight request never carries a near-expired credential.
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower
// case, so the eviction gate compares against this constant.
private const val BEARER_SCHEME = "bearer"
}
New code: one top-level definition; both companions lose BEARER_SCHEME, the async
file loses its private function and its now-orphaned AuthChallengeParser import.
// ── BearerTokenAuthStep.kt — new top-level definitions (after the class), at file scope ──
/**
* Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` challenge. A
* header with only non-bearer challenges (or one that does not parse) returns `false`. The
* [AuthStep] / [AsyncAuthStep] pillar guarantees the header is present before the challenge hook
* runs; the explicit null-guard keeps the function correct if called from elsewhere. Shared by
* [BearerTokenAuthStep] and [AsyncBearerTokenAuthStep].
*/
internal fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}
// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower case, so the
// eviction gate compares against this constant.
internal const val BEARER_SCHEME: String = "bearer"
// ── BearerTokenAuthStep.kt — the class loses its private offersBearerChallenge; companion is now ──
private companion object {
// Default refresh margin: refresh the bearer token 30 seconds before its expiry
// so an in-flight request never carries a near-expired credential.
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
}
// ── AsyncBearerTokenAuthStep.kt:258-291 — function removed, BEARER_SCHEME removed ──
/**
* Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
* Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against
* a concurrent refresh.
*/
private fun evictRejectedToken(rejectedHeader: String) {
lock.withLock {
val current = cachedToken ?: return
if (bearerHeaderValue(current.token) == rejectedHeader) {
cachedToken = null
}
}
}
/**
* The `Authorization` header value for [token]. Single source of truth shared by the
* stamping path and [evictRejectedToken]. A subclass that emits a different header format
* must override this too, or eviction stops matching.
*/
protected open fun bearerHeaderValue(token: String): String = "Bearer $token"
private companion object {
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
}
}
// ── AsyncBearerTokenAuthStep.kt:10 — delete this now-orphaned import ──
// import org.dexpace.sdk.core.http.auth.AuthChallengeParser
Usage — before → after: call sites unchanged — behavior-preserving. The unqualified
call in each challenge hook now resolves to the top-level function:
// AsyncBearerTokenAuthStep.authorizeRequestOnChallengeAsync / BearerTokenAuthStep.authorizeRequestOnChallenge
if (!offersBearerChallenge(response)) return CompletableFuture.completedFuture(null)
// AsyncBearerTokenAuthStep.authorizeRequestOnChallengeAsync / BearerTokenAuthStep.authorizeRequestOnChallenge
if (!offersBearerChallenge(response)) return CompletableFuture.completedFuture(null)
Why: one definition of the bearer-challenge check instead of two copies that must be
edited together; BEARER_SCHEME lives next to the only function that reads it.
Build: removing the orphaned import org.dexpace.sdk.core.http.auth.AuthChallengeParser
from AsyncBearerTokenAuthStep.kt is required — allWarningsAsErrors fails on the
unused import once the function moves. BearerTokenAuthStep.kt keeps the import (the
top-level function now uses it). Internal only — no apiCheck impact.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt:86-119 — fold the HookOutcome carrier into a single handle
handleChallenge runs the challenge future through a handle that does nothing but pack
its (value, error) into a private HookOutcome, then a thenCompose that unpacks it and
does the real work. The carrier and the extra composition stage are unnecessary — do the
close/dispatch decision inside a single handle whose result is flattened with
thenCompose { it }.
Old code:
private fun handleChallenge(
authorized: Request,
response: Response,
next: AsyncPipelineNext,
): CompletableFuture<Response> {
if (response.status.code != SC_UNAUTHORIZED) return CompletableFuture.completedFuture(response)
response.headers.get(HttpHeaderName.WWW_AUTHENTICATE)
?: return CompletableFuture.completedFuture(response)
val challengeFuture: CompletableFuture<Request?> =
try {
authorizeRequestOnChallengeAsync(authorized, response)
} catch (t: Throwable) {
// A sync throw from the hook (caller-bug case) must still close the 401 body.
response.close()
return Futures.failed(t)
}
return challengeFuture.handle { retryRequest, hookError ->
HookOutcome(retryRequest, hookError)
}.thenCompose { outcome ->
val hookError = outcome.error
if (hookError != null) {
response.close()
return@thenCompose Futures.failed<Response>(Futures.unwrap(hookError))
}
val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response)
response.close()
next.copy().processAsync(retryRequest)
}
}
/** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */
private class HookOutcome(val request: Request?, val error: Throwable?)
New code:
private fun handleChallenge(
authorized: Request,
response: Response,
next: AsyncPipelineNext,
): CompletableFuture<Response> {
if (response.status.code != SC_UNAUTHORIZED) return CompletableFuture.completedFuture(response)
response.headers.get(HttpHeaderName.WWW_AUTHENTICATE)
?: return CompletableFuture.completedFuture(response)
val challengeFuture: CompletableFuture<Request?> =
try {
authorizeRequestOnChallengeAsync(authorized, response)
} catch (t: Throwable) {
// A sync throw from the hook (caller-bug case) must still close the 401 body.
response.close()
return Futures.failed(t)
}
return challengeFuture.handle<CompletableFuture<Response>> { retryRequest, hookError ->
if (hookError != null) {
response.close()
return@handle Futures.failed<Response>(Futures.unwrap(hookError))
}
if (retryRequest == null) return@handle CompletableFuture.completedFuture(response)
response.close()
next.copy().processAsync(retryRequest)
}.thenCompose { it }
}
Usage — before → after: call sites unchanged — behavior-preserving. processAsync
composes handleChallenge exactly as before:
return authorizedFuture.thenCompose { authorized ->
next.copy().processAsync(authorized).thenCompose { response ->
handleChallenge(authorized, response, next)
}
}
return authorizedFuture.thenCompose { authorized ->
next.copy().processAsync(authorized).thenCompose { response ->
handleChallenge(authorized, response, next)
}
}
Why: removes a one-field carrier type and a whole composition stage; the handle
lambda decides close-and-fail / pass-through / retry directly, and the explicit
null-check smart-casts retryRequest to non-null for the dispatch. Behaviour is
identical — the original handle did no work beyond boxing the outcome.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt:69-119 — add an internal withMaxRetries copy helper
HttpRetryOptions isn't a data class, so the negative-maxRetries clamp in both retry
steps hand-rebuilds all eight fields through the constructor. Add one internal
withMaxRetries that does the copy, and let both clampOptions call it.
Old code:
public class HttpRetryOptions
@JvmOverloads
constructor(
public val maxRetries: Int = DEFAULT_MAX_RETRIES,
public val baseDelay: Duration = RetrySettings.DEFAULT_INITIAL_DELAY,
public val maxDelay: Duration = RetrySettings.DEFAULT_MAX_DELAY,
public val fixedDelay: Duration? = null,
public val retryAfterHeaders: List<HttpHeaderName> = DEFAULT_RETRY_AFTER_HEADERS,
public val shouldRetryCondition: HttpRetryConditionPredicate =
HttpRetryConditionPredicate(::defaultShouldRetryResponse),
public val shouldRetryException: HttpRetryConditionPredicate =
HttpRetryConditionPredicate(::defaultShouldRetryException),
public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null },
) {
public companion object {
// The default retry count is the canonical SDK budget, kept in one place on
// DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS).
private const val DEFAULT_MAX_RETRIES = DefaultRetryStep.DEFAULT_MAX_RETRIES
/**
* The three `Retry-After` header forms parsed by [DefaultRetryStep]. Order matters —
* the first header present on the response wins.
*/
@JvmField
public val DEFAULT_RETRY_AFTER_HEADERS: List<HttpHeaderName> =
Collections.unmodifiableList(
listOf(
HttpHeaderName.RETRY_AFTER,
HttpHeaderName.RETRY_AFTER_MS,
HttpHeaderName.X_MS_RETRY_AFTER_MS,
),
)
/**
* Returns an [HttpRetryOptions] that uses a flat [delay] between every retry — no
* exponential growth, no jitter. [baseDelay] and [maxDelay] are forced to zero
* so the backoff path is unreachable.
*/
@JvmStatic
public fun fixed(
maxRetries: Int,
delay: Duration,
): HttpRetryOptions =
HttpRetryOptions(
maxRetries = maxRetries,
fixedDelay = delay,
baseDelay = Duration.ZERO,
maxDelay = Duration.ZERO,
)
}
}
New code:
public class HttpRetryOptions
@JvmOverloads
constructor(
public val maxRetries: Int = DEFAULT_MAX_RETRIES,
public val baseDelay: Duration = RetrySettings.DEFAULT_INITIAL_DELAY,
public val maxDelay: Duration = RetrySettings.DEFAULT_MAX_DELAY,
public val fixedDelay: Duration? = null,
public val retryAfterHeaders: List<HttpHeaderName> = DEFAULT_RETRY_AFTER_HEADERS,
public val shouldRetryCondition: HttpRetryConditionPredicate =
HttpRetryConditionPredicate(::defaultShouldRetryResponse),
public val shouldRetryException: HttpRetryConditionPredicate =
HttpRetryConditionPredicate(::defaultShouldRetryException),
public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null },
) {
/**
* Returns a copy of these options with [maxRetries] replaced and every other field
* preserved. Used by the retry steps to apply the negative-`maxRetries` clamp without
* hand-rebuilding all eight fields at each call site.
*/
internal fun withMaxRetries(maxRetries: Int): HttpRetryOptions =
HttpRetryOptions(
maxRetries = maxRetries,
baseDelay = baseDelay,
maxDelay = maxDelay,
fixedDelay = fixedDelay,
retryAfterHeaders = retryAfterHeaders,
shouldRetryCondition = shouldRetryCondition,
shouldRetryException = shouldRetryException,
delayFromCondition = delayFromCondition,
)
public companion object {
// The default retry count is the canonical SDK budget, kept in one place on
// DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS).
private const val DEFAULT_MAX_RETRIES = DefaultRetryStep.DEFAULT_MAX_RETRIES
/**
* The three `Retry-After` header forms parsed by [DefaultRetryStep]. Order matters —
* the first header present on the response wins.
*/
@JvmField
public val DEFAULT_RETRY_AFTER_HEADERS: List<HttpHeaderName> =
Collections.unmodifiableList(
listOf(
HttpHeaderName.RETRY_AFTER,
HttpHeaderName.RETRY_AFTER_MS,
HttpHeaderName.X_MS_RETRY_AFTER_MS,
),
)
/**
* Returns an [HttpRetryOptions] that uses a flat [delay] between every retry — no
* exponential growth, no jitter. [baseDelay] and [maxDelay] are forced to zero
* so the backoff path is unreachable.
*/
@JvmStatic
public fun fixed(
maxRetries: Int,
delay: Duration,
): HttpRetryOptions =
HttpRetryOptions(
maxRetries = maxRetries,
fixedDelay = delay,
baseDelay = Duration.ZERO,
maxDelay = Duration.ZERO,
)
}
}
Usage — before → after: clampOptions in both retry steps collapses to one call. The
sync step references the bare DEFAULT_MAX_RETRIES (its own companion); the async step must
qualify DefaultRetryStep.DEFAULT_MAX_RETRIES.
// DefaultRetryStep.clampOptions — before
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
if (opts.maxRetries >= 0) return opts
logger.atVerbose()
.event("http.retry.maxRetries_clamped")
.field("http.retry.max_retries.requested", opts.maxRetries.toLong())
.field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong())
.log()
// HttpRetryOptions isn't a data class, so an explicit copy via the constructor is
// the cheapest correct fix — re-using every other field as-is.
return HttpRetryOptions(
maxRetries = DEFAULT_MAX_RETRIES,
baseDelay = opts.baseDelay,
maxDelay = opts.maxDelay,
fixedDelay = opts.fixedDelay,
retryAfterHeaders = opts.retryAfterHeaders,
shouldRetryCondition = opts.shouldRetryCondition,
shouldRetryException = opts.shouldRetryException,
delayFromCondition = opts.delayFromCondition,
)
}
// DefaultRetryStep.clampOptions — after
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
if (opts.maxRetries >= 0) return opts
logger.atVerbose()
.event("http.retry.maxRetries_clamped")
.field("http.retry.max_retries.requested", opts.maxRetries.toLong())
.field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong())
.log()
return opts.withMaxRetries(DEFAULT_MAX_RETRIES)
}
// DefaultAsyncRetryStep.clampOptions — after (qualified constant)
// return opts.withMaxRetries(DefaultRetryStep.DEFAULT_MAX_RETRIES)
// If the RetryPolicySupport extraction above lands, this same collapse applies inside
// RetryPolicySupport.clampOptions.
Why: the eight-field copy lives once on the type that owns the fields, so adding a
ninth option later can't leave a stale hand-rebuild that silently drops it in the clamp
path; each clampOptions shrinks to a single readable line.
Build: the one-line constructor call exceeds ktlint's 120-column limit, so the named
arguments are wrapped one-per-line. withMaxRetries is internal, so it does not appear
in the .api snapshots — no apiCheck impact.
While tidying the sync/async step pairs under
http/pipeline/stepsI found several more helperblocks that have drifted into byte-for-byte duplicates between siblings, plus a couple of small
local cleanups in the same files. These are independent, behavior-preserving, and internal — no
public signature changes, so
apiCheckis unaffected.The largest item in this area — extracting a shared instrumentation emitter from the two
instrumentation steps — is tracked separately in #26; this issue covers the rest.
The items below can land as separate commits. The
RetryPolicySupportextraction is the onlystructural one and deserves a careful review.
sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt:112-136 — drop the throwaway MDC capture
processAsyncinitialisesmdcwith anMdcSnapshot.capture()that is overwritten onthe very first line inside the
use { }block, so every async request pays for one MDCcopy that is immediately discarded. A
lateinit varexpresses the intent (assigned insidethe scope, read after) without the wasted capture.
Old code:
New code:
Usage — before → after: call sites unchanged — behavior-preserving.
mdcis assignedon the first line of the
use { }block (before any early return reads it), so the readbelow is always initialised:
Why: avoids one wasted
MdcSnapshotcopy/allocation per async request;lateinitalso documents that the real capture happens inside the trace-context scope (so the
snapshot carries
trace.id/span.id), which the throwaway initializer obscured.sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt:408-544 — extract
RetryPolicySupportfor the shared stateless policyDefaultAsyncRetryStepandDefaultRetryStepshare the entire retry policy: optionsclamping, the
RetrySettingsbackoff view, re-sendability gating, interruptnormalisation, predicate invocation, the caller delay override, and fixed/exponential
backoff. Only the per-call loop machinery and the terminal-failure paths genuinely differ.
Pull the stateless policy into one
internal class RetryPolicySupport(rawOptions, clock, logger)that clamps options and buildsbackoffSettingsonce and exposes the sharedhelpers; both steps hold a
supportand delegate.Deliberately kept on each step (they differ): the per-call
shouldRetryResponse/shouldRetryException(they readtryCount/suppressed), the distinct terminal-completionpaths, the
protected opendelay hooks (computeResponseDelay/computeExceptionDelay/retryAfterFromHeaders— part of the subclassing contract), andcloseQuietlywith itsdeliberately different catch width (sync swallows
IOException, async swallowsException).This is the structural item — review carefully.
Old code: (
DefaultAsyncRetryStep.ktlines 408-544;DefaultRetryStep.ktcarries thesame helpers at lines 294-597 with only the
closeQuietlycatch width differing)New code: the shared stateless policy in one file. The
protected opendelay hooks andcloseQuietlystay on each step and delegate intosupport(shown under Usage).NANOS_PER_MILLIstays in each step's companion (used by the per-call
logRetry).Usage — before → after: the step holds one
supportinstead ofoptions+backoffSettings+ the five private helpers, and theprotected openhooks delegate:Why: the retry decision/backoff logic lives in one place, so a change to clamping,
backoff settings, or predicate-exception handling can't silently apply to only one of the
two stacks; each step shrinks to its genuinely-distinct loop and terminal paths.
Build: new file needs the MIT header.
BackoffCalculator,RetrySettings, andMethodimports move from both steps intoRetryPolicySupport(drop them from the steps orallWarningsAsErrorsfails); each step keepsInterruptedIOException(the driver stilltype-checks
is InterruptedIOException) andRetryAfterParser(used by the retainedretryAfterFromHeaders). Internal only —DefaultRetryStep.DEFAULT_MAX_RETRIESstayspublic, so no
apiCheckimpact.sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt:258-291 — hoist
offersBearerChallengeto one top-level functionBoth bearer steps define a private
offersBearerChallenge(response)and a private"bearer"scheme constant with identical bodies. Hoist them to a single top-levelinternal fun+internal constinBearerTokenAuthStep.kt(mirroring howdefaultShouldRetryResponsealready lives top-level inHttpRetryOptions.kt) and call itfrom both.
evictRejectedTokenstays duplicated — it touches each step's ownlock/cachedTokenand isn't shareable as-is.Old code: (
AsyncBearerTokenAuthStep.ktlines 258-291, plus the sync duplicates itmirrors)
New code: one top-level definition; both companions lose
BEARER_SCHEME, the asyncfile loses its private function and its now-orphaned
AuthChallengeParserimport.Usage — before → after: call sites unchanged — behavior-preserving. The unqualified
call in each challenge hook now resolves to the top-level function:
Why: one definition of the bearer-challenge check instead of two copies that must be
edited together;
BEARER_SCHEMElives next to the only function that reads it.Build: removing the orphaned
import org.dexpace.sdk.core.http.auth.AuthChallengeParserfrom
AsyncBearerTokenAuthStep.ktis required —allWarningsAsErrorsfails on theunused import once the function moves.
BearerTokenAuthStep.ktkeeps the import (thetop-level function now uses it). Internal only — no
apiCheckimpact.sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt:86-119 — fold the
HookOutcomecarrier into a singlehandlehandleChallengeruns the challenge future through ahandlethat does nothing but packits
(value, error)into a privateHookOutcome, then athenComposethat unpacks it anddoes the real work. The carrier and the extra composition stage are unnecessary — do the
close/dispatch decision inside a single
handlewhose result is flattened withthenCompose { it }.Old code:
New code:
Usage — before → after: call sites unchanged — behavior-preserving.
processAsynccomposes
handleChallengeexactly as before:Why: removes a one-field carrier type and a whole composition stage; the
handlelambda decides close-and-fail / pass-through / retry directly, and the explicit
null-check smart-castsretryRequestto non-null for the dispatch. Behaviour isidentical — the original
handledid no work beyond boxing the outcome.sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt:69-119 — add an internal
withMaxRetriescopy helperHttpRetryOptionsisn't adata class, so the negative-maxRetriesclamp in both retrysteps hand-rebuilds all eight fields through the constructor. Add one internal
withMaxRetriesthat does the copy, and let bothclampOptionscall it.Old code:
New code:
Usage — before → after:
clampOptionsin both retry steps collapses to one call. Thesync step references the bare
DEFAULT_MAX_RETRIES(its own companion); the async step mustqualify
DefaultRetryStep.DEFAULT_MAX_RETRIES.Why: the eight-field copy lives once on the type that owns the fields, so adding a
ninth option later can't leave a stale hand-rebuild that silently drops it in the clamp
path; each
clampOptionsshrinks to a single readable line.Build: the one-line constructor call exceeds ktlint's 120-column limit, so the named
arguments are wrapped one-per-line.
withMaxRetriesisinternal, so it does not appearin the
.apisnapshots — noapiCheckimpact.