Reading through the recovery-aware pipeline primitives (pipeline) and the retry step
(pipeline/step/retry), I noticed a handful of small, self-contained duplications that have crept in
as the two stacks grew. None of these change behavior — they collapse copy-pasted logic onto a single
definition and let the elvis/loop idioms carry the intent. Grouping them in one ticket because they all
live in the same subsystem and are independent, mechanical edits.
The first one touches interrupt handling, so I've been careful to keep the cancellation contract exact:
the wrapper that flows a failure through the pipeline restores the interrupt flag, while the direct
attempt() path — which rethrows the throwable to the caller — deliberately does not, and is left alone.
sdk-core/.../pipeline/ExecutionPipeline.kt:117-122 (+ ResponsePipeline.kt:147-152, pipeline/step/retry/RetryStep.kt:289-297 ) — one interrupt-aware failure wrapper
Three places independently re-implement "wrap a throwable in ResponseOutcome.Failure, restoring the
interrupt flag first if it's an InterruptedException": ExecutionPipeline.failureOf,
ResponsePipeline.handleStepThrowable, and the inner catch in RetryStep.executeOnce. They should
share a single module-internal top-level function, mirroring the existing top-level
internal fun Duration.toNanosSaturating() that already lives in this package
(BackoffCalculator.kt).
Old code:
// ── ExecutionPipeline.kt (lines 113–122) — private member, removed ──
/**
* Wraps [t] in a [ResponseOutcome.Failure]. [InterruptedException] preserves the interrupt
* flag on the current thread per the SDK's cancellation contract.
*/
private fun failureOf(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}
// ── ResponsePipeline.kt — call sites (lines 97–141) + private member (lines 143–152) ──
private fun applyResponseSteps(
outcome: ResponseOutcome,
context: DispatchContext,
): ResponseOutcome {
var current = outcome
for (step in responseSteps) {
current =
when (val inbound = current) {
is ResponseOutcome.Success -> {
// Capture the in-hand response before the step runs so the catch can
// release it. A throwing step would otherwise strand the open transport
// connection — mirrors the close-before-propagate discipline in
// DefaultRedirectStep / AuthStep.
val inResponse = inbound.response
try {
ResponseOutcome.Success(step.execute(inResponse, context))
} catch (t: Throwable) {
closeQuietly(inResponse, t)
handleStepThrowable(t)
}
}
is ResponseOutcome.Failure -> return inbound
}
}
return current
}
/**
* Invokes a single recovery step, wrapping any throwable it raises into a
* [ResponseOutcome.Failure] so the chain keeps flowing.
*/
private fun invokeRecovery(
step: ResponseRecoveryStep,
outcome: ResponseOutcome,
): ResponseOutcome =
try {
step.invoke(outcome)
} catch (t: Throwable) {
// A recovery step that throws on a Success outcome strands the in-hand response:
// close it before wrapping the throwable so the open connection is released.
if (outcome is ResponseOutcome.Success) {
closeQuietly(outcome.response, t)
}
handleStepThrowable(t)
}
/**
* Converts a step-raised throwable into a [ResponseOutcome.Failure]. [InterruptedException]
* preserves the interrupt flag on the current thread per the SDK's cancellation contract.
*/
private fun handleStepThrowable(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}
// ── RetryStep.kt (lines 289–297) ──
private fun executeOnce(attemptOrdinal: Int): ResponseOutcome =
try {
ResponseOutcome.Success(httpClient.execute(stampAttempt(request, attemptOrdinal)))
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
ResponseOutcome.Failure(e)
} catch (t: Throwable) {
ResponseOutcome.Failure(t)
}
New code:
// ── ExecutionPipeline.kt — the private failureOf member is removed; a top-level internal
// function is added after the class's closing brace (alongside the toNanosSaturating precedent) ──
/**
* Wraps [t] in a [ResponseOutcome.Failure]. When [t] is an [InterruptedException] the interrupt
* flag is restored on the current thread before wrapping, honouring the SDK's cancellation
* contract so a thread blocked on the surfaced outcome still observes the cancellation. Shared by
* [ExecutionPipeline], [ResponsePipeline], and `RetryStep` so the interrupt-aware wrapper has
* exactly one definition.
*/
internal fun failureOf(t: Throwable): ResponseOutcome.Failure {
if (t is InterruptedException) {
Thread.currentThread().interrupt()
}
return ResponseOutcome.Failure(t)
}
// ── ResponsePipeline.kt — both call sites now call the shared failureOf; the private
// handleStepThrowable member is removed (failureOf is in the same package, no import needed) ──
private fun applyResponseSteps(
outcome: ResponseOutcome,
context: DispatchContext,
): ResponseOutcome {
var current = outcome
for (step in responseSteps) {
current =
when (val inbound = current) {
is ResponseOutcome.Success -> {
// Capture the in-hand response before the step runs so the catch can
// release it. A throwing step would otherwise strand the open transport
// connection — mirrors the close-before-propagate discipline in
// DefaultRedirectStep / AuthStep.
val inResponse = inbound.response
try {
ResponseOutcome.Success(step.execute(inResponse, context))
} catch (t: Throwable) {
closeQuietly(inResponse, t)
failureOf(t)
}
}
is ResponseOutcome.Failure -> return inbound
}
}
return current
}
/**
* Invokes a single recovery step, wrapping any throwable it raises into a
* [ResponseOutcome.Failure] so the chain keeps flowing.
*/
private fun invokeRecovery(
step: ResponseRecoveryStep,
outcome: ResponseOutcome,
): ResponseOutcome =
try {
step.invoke(outcome)
} catch (t: Throwable) {
// A recovery step that throws on a Success outcome strands the in-hand response:
// close it before wrapping the throwable so the open connection is released.
if (outcome is ResponseOutcome.Success) {
closeQuietly(outcome.response, t)
}
failureOf(t)
}
// ── RetryStep.kt — the two catch arms collapse into one, since failureOf already handles the
// InterruptedException case; add the import below ──
import org.dexpace.sdk.core.pipeline.failureOf
private fun executeOnce(attemptOrdinal: Int): ResponseOutcome =
try {
ResponseOutcome.Success(httpClient.execute(stampAttempt(request, attemptOrdinal)))
} catch (t: Throwable) {
failureOf(t)
}
The new import in RetryStep.kt sorts between the two existing pipeline imports
(ASCII order: ResponseOutcome < failureOf < step.ResponseRecoveryStep):
import org.dexpace.sdk.core.pipeline.ResponseOutcome
import org.dexpace.sdk.core.pipeline.failureOf
import org.dexpace.sdk.core.pipeline.step.ResponseRecoveryStep
Usage — before → after:
// ExecutionPipeline.produceOutcome — the call spelling is identical; with the private member gone,
// `failureOf(t)` now resolves to the top-level function in the same package.
} catch (t: Throwable) {
return failureOf(t)
}
// ...
} catch (t: Throwable) {
failureOf(t)
}
// ExecutionPipeline.produceOutcome — unchanged
} catch (t: Throwable) {
return failureOf(t)
}
// ...
} catch (t: Throwable) {
failureOf(t)
}
ExecutionPipeline's two call sites are call sites unchanged — behavior-preserving. The
ResponsePipeline sites change spelling only (handleStepThrowable(t) → failureOf(t)) and
RetryStep.executeOnce folds its two catch arms into one; all three produce an identical
ResponseOutcome.Failure with the interrupt flag restored.
Why: One definition of the interrupt-aware failure wrapper instead of three byte-for-byte copies,
following the existing top-level internal Duration.toNanosSaturating precedent in this package.
Interrupt handling — leave RetryStep.attempt alone: the catch (t: Throwable) { ResponseOutcome.Failure(t) }
inside attempt() (lines 158-163) is intentionally not routed through failureOf. attempt()
rethrows the wrapped throwable to its caller (is ResponseOutcome.Failure -> throw final.error), so an
InterruptedException propagates as itself and carries the cancellation directly; pre-restoring the flag
there is unnecessary. failureOf exists for the opposite path, where the failure is buried in an outcome
that flows on through the pipeline and may be swallowed or recovered, so the flag must be restored to
preserve cancellation.
API / Build: failureOf is internal, so the public .api surface is unchanged — no apiDump. The
removed private members were never on the API surface either.
sdk-core/.../pipeline/step/retry/RetryStep.kt:365-368 — elvis for the scheduler fallback
Old code:
private fun resolveScheduler(): ScheduledExecutorService {
settings.scheduler?.let { return it }
return DEFAULT_SCHEDULER
}
New code:
private fun resolveScheduler(): ScheduledExecutorService = settings.scheduler ?: DEFAULT_SCHEDULER
Usage — before → after:
// awaitDelay (line 330)
val sched = resolveScheduler()
// awaitDelay (line 330)
val sched = resolveScheduler()
Call sites unchanged — behavior-preserving.
Why: The elvis operator states "caller-supplied scheduler, otherwise the lazy default" directly. The
by lazy DEFAULT_SCHEDULER is still only dereferenced when settings.scheduler is null, so the
once-per-VM lazy-init timing is preserved. (The KDoc on the method is retained.)
sdk-core/.../pipeline/step/retry/RetrySettings.kt:131-166 — one Duration validator for the three Duration setters
totalTimeout, initialDelay, and maxDelay each carry the same two require checks (non-negative +
nanosecond-representable). Extract them into a private helper that names the offending field. The
delayMultiplier setter sits in this range too but validates a Double, so it stays as-is.
Old code:
/** Sets [RetrySettings.totalTimeout]. Must be non-negative. */
public fun totalTimeout(totalTimeout: Duration): RetrySettingsBuilder =
apply {
require(!totalTimeout.isNegative) { "totalTimeout must be non-negative" }
require(totalTimeout <= MAX_NANO_REPRESENTABLE_DELAY) {
"totalTimeout must be representable in nanoseconds (≤ ~292 years); got $totalTimeout"
}
this.totalTimeout = totalTimeout
}
/** Sets [RetrySettings.initialDelay]. Must be non-negative. */
public fun initialDelay(initialDelay: Duration): RetrySettingsBuilder =
apply {
require(!initialDelay.isNegative) { "initialDelay must be non-negative" }
require(initialDelay <= MAX_NANO_REPRESENTABLE_DELAY) {
"initialDelay must be representable in nanoseconds (≤ ~292 years); got $initialDelay"
}
this.initialDelay = initialDelay
}
/** Sets [RetrySettings.delayMultiplier]. Must be ≥ 1.0. */
public fun delayMultiplier(delayMultiplier: Double): RetrySettingsBuilder =
apply {
require(delayMultiplier >= 1.0) { "delayMultiplier must be ≥ 1.0 (got $delayMultiplier)" }
this.delayMultiplier = delayMultiplier
}
/** Sets [RetrySettings.maxDelay]. Must be non-negative. */
public fun maxDelay(maxDelay: Duration): RetrySettingsBuilder =
apply {
require(!maxDelay.isNegative) { "maxDelay must be non-negative" }
require(maxDelay <= MAX_NANO_REPRESENTABLE_DELAY) {
"maxDelay must be representable in nanoseconds (≤ ~292 years); got $maxDelay"
}
this.maxDelay = maxDelay
}
New code:
/**
* Validates a [Duration] setting: it must be non-negative and small enough that the
* backoff math can convert it to nanoseconds without overflowing. Shared by the
* [totalTimeout], [initialDelay], and [maxDelay] setters; [name] names the offending
* field in the rejection message.
*/
private fun requireRepresentable(
name: String,
value: Duration,
) {
require(!value.isNegative) { "$name must be non-negative" }
require(value <= MAX_NANO_REPRESENTABLE_DELAY) {
"$name must be representable in nanoseconds (≤ ~292 years); got $value"
}
}
/** Sets [RetrySettings.totalTimeout]. Must be non-negative. */
public fun totalTimeout(totalTimeout: Duration): RetrySettingsBuilder =
apply {
requireRepresentable("totalTimeout", totalTimeout)
this.totalTimeout = totalTimeout
}
/** Sets [RetrySettings.initialDelay]. Must be non-negative. */
public fun initialDelay(initialDelay: Duration): RetrySettingsBuilder =
apply {
requireRepresentable("initialDelay", initialDelay)
this.initialDelay = initialDelay
}
/** Sets [RetrySettings.delayMultiplier]. Must be ≥ 1.0. */
public fun delayMultiplier(delayMultiplier: Double): RetrySettingsBuilder =
apply {
require(delayMultiplier >= 1.0) { "delayMultiplier must be ≥ 1.0 (got $delayMultiplier)" }
this.delayMultiplier = delayMultiplier
}
/** Sets [RetrySettings.maxDelay]. Must be non-negative. */
public fun maxDelay(maxDelay: Duration): RetrySettingsBuilder =
apply {
requireRepresentable("maxDelay", maxDelay)
this.maxDelay = maxDelay
}
Usage — before → after:
RetrySettings.builder()
.totalTimeout(Duration.ofSeconds(10))
.initialDelay(Duration.ofMillis(100))
.maxDelay(Duration.ofSeconds(5))
.build()
RetrySettings.builder()
.totalTimeout(Duration.ofSeconds(10))
.initialDelay(Duration.ofMillis(100))
.maxDelay(Duration.ofSeconds(5))
.build()
Call sites unchanged — behavior-preserving. The two rejection messages are carried verbatim (including
the unicode ≤); only the field name and value are now interpolated, so a rejected initialDelay still
reads initialDelay must be representable in nanoseconds (≤ ~292 years); got ….
Why: Removes the triplicated non-negative + nanosecond-representability validation while keeping the
exact messages. MAX_NANO_REPRESENTABLE_DELAY stays referenced (now from the helper), so no orphaned
constant.
API / Build: the three setter signatures are unchanged and requireRepresentable is private, so
apiCheck stays green — no apiDump.
sdk-core/.../pipeline/step/retry/RetryAfterParser.kt:150-166 — loop over the millisecond-variant headers
The retry-after-ms and x-ms-retry-after-ms branches are a verbatim copy of each other. Iterate the two
header names instead, referencing the existing consts.
Old code:
val retryAfter = headers.get(HEADER_RETRY_AFTER)?.trim()
if (!retryAfter.isNullOrEmpty()) {
parseNumericSeconds(retryAfter)?.let { return it }
parseHttpDate(retryAfter, now)?.let { return it }
}
headers.get(HEADER_RETRY_AFTER_MS)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
}
headers.get(HEADER_X_MS_RETRY_AFTER_MS)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
}
val rateLimitReset = headers.get(HEADER_X_RATELIMIT_RESET)?.trim()
if (!rateLimitReset.isNullOrEmpty()) {
parseUnixEpochWithJitter(rateLimitReset, now)?.let { return it }
}
return null
New code:
val retryAfter = headers.get(HEADER_RETRY_AFTER)?.trim()
if (!retryAfter.isNullOrEmpty()) {
parseNumericSeconds(retryAfter)?.let { return it }
parseHttpDate(retryAfter, now)?.let { return it }
}
for (header in arrayOf(HEADER_RETRY_AFTER_MS, HEADER_X_MS_RETRY_AFTER_MS)) {
headers.get(header)?.trim()?.let { value ->
if (value.isNotEmpty()) parseMillis(value)?.let { return it }
}
}
val rateLimitReset = headers.get(HEADER_X_RATELIMIT_RESET)?.trim()
if (!rateLimitReset.isNullOrEmpty()) {
parseUnixEpochWithJitter(rateLimitReset, now)?.let { return it }
}
return null
Usage — before → after:
val hint: Duration? = RetryAfterParser.parse(response.headers, Clock.systemUTC().instant())
val hint: Duration? = RetryAfterParser.parse(response.headers, Clock.systemUTC().instant())
Call sites unchanged — behavior-preserving. The return it inside the let is a non-local return from
parse, so the first usable value still wins; arrayOf keeps retry-after-ms ahead of
x-ms-retry-after-ms, preserving header precedence.
Why: Removes a verbatim copy of the millisecond-variant parse block. Precedence is preserved because
the array order matches the original source order and the first match returns immediately.
Build: the loop references the existing HEADER_RETRY_AFTER_MS / HEADER_X_MS_RETRY_AFTER_MS
constants (not string literals), so neither becomes an orphaned private const for detekt to flag. The
@Suppress("ReturnCount") on parse is retained; the public parse signature is unchanged.
Reading through the recovery-aware pipeline primitives (
pipeline) and the retry step(
pipeline/step/retry), I noticed a handful of small, self-contained duplications that have crept inas the two stacks grew. None of these change behavior — they collapse copy-pasted logic onto a single
definition and let the elvis/loop idioms carry the intent. Grouping them in one ticket because they all
live in the same subsystem and are independent, mechanical edits.
The first one touches interrupt handling, so I've been careful to keep the cancellation contract exact:
the wrapper that flows a failure through the pipeline restores the interrupt flag, while the direct
attempt()path — which rethrows the throwable to the caller — deliberately does not, and is left alone.sdk-core/.../pipeline/ExecutionPipeline.kt:117-122(+ResponsePipeline.kt:147-152,pipeline/step/retry/RetryStep.kt:289-297) — one interrupt-aware failure wrapperThree places independently re-implement "wrap a throwable in
ResponseOutcome.Failure, restoring theinterrupt flag first if it's an
InterruptedException":ExecutionPipeline.failureOf,ResponsePipeline.handleStepThrowable, and the innercatchinRetryStep.executeOnce. They shouldshare a single module-internal top-level function, mirroring the existing top-level
internal fun Duration.toNanosSaturating()that already lives in this package(
BackoffCalculator.kt).Old code:
New code:
The new import in
RetryStep.ktsorts between the two existingpipelineimports(ASCII order:
ResponseOutcome<failureOf<step.ResponseRecoveryStep):Usage — before → after:
ExecutionPipeline's two call sites are call sites unchanged — behavior-preserving. TheResponsePipelinesites change spelling only (handleStepThrowable(t)→failureOf(t)) andRetryStep.executeOncefolds its twocatcharms into one; all three produce an identicalResponseOutcome.Failurewith the interrupt flag restored.Why: One definition of the interrupt-aware failure wrapper instead of three byte-for-byte copies,
following the existing top-level
internal Duration.toNanosSaturatingprecedent in this package.Interrupt handling — leave
RetryStep.attemptalone: thecatch (t: Throwable) { ResponseOutcome.Failure(t) }inside
attempt()(lines 158-163) is intentionally not routed throughfailureOf.attempt()rethrows the wrapped throwable to its caller (
is ResponseOutcome.Failure -> throw final.error), so anInterruptedExceptionpropagates as itself and carries the cancellation directly; pre-restoring the flagthere is unnecessary.
failureOfexists for the opposite path, where the failure is buried in an outcomethat flows on through the pipeline and may be swallowed or recovered, so the flag must be restored to
preserve cancellation.
API / Build:
failureOfisinternal, so the public.apisurface is unchanged — noapiDump. Theremoved
privatemembers were never on the API surface either.sdk-core/.../pipeline/step/retry/RetryStep.kt:365-368— elvis for the scheduler fallbackOld code:
New code:
Usage — before → after:
Call sites unchanged — behavior-preserving.
Why: The elvis operator states "caller-supplied scheduler, otherwise the lazy default" directly. The
by lazyDEFAULT_SCHEDULERis still only dereferenced whensettings.schedulerisnull, so theonce-per-VM lazy-init timing is preserved. (The KDoc on the method is retained.)
sdk-core/.../pipeline/step/retry/RetrySettings.kt:131-166— one Duration validator for the three Duration setterstotalTimeout,initialDelay, andmaxDelayeach carry the same tworequirechecks (non-negative +nanosecond-representable). Extract them into a private helper that names the offending field. The
delayMultipliersetter sits in this range too but validates aDouble, so it stays as-is.Old code:
New code:
Usage — before → after:
Call sites unchanged — behavior-preserving. The two rejection messages are carried verbatim (including
the unicode
≤); only the field name and value are now interpolated, so a rejectedinitialDelaystillreads
initialDelay must be representable in nanoseconds (≤ ~292 years); got ….Why: Removes the triplicated non-negative + nanosecond-representability validation while keeping the
exact messages.
MAX_NANO_REPRESENTABLE_DELAYstays referenced (now from the helper), so no orphanedconstant.
API / Build: the three setter signatures are unchanged and
requireRepresentableisprivate, soapiCheckstays green — noapiDump.sdk-core/.../pipeline/step/retry/RetryAfterParser.kt:150-166— loop over the millisecond-variant headersThe
retry-after-msandx-ms-retry-after-msbranches are a verbatim copy of each other. Iterate the twoheader names instead, referencing the existing
consts.Old code:
New code:
Usage — before → after:
Call sites unchanged — behavior-preserving. The
return itinside theletis a non-local return fromparse, so the first usable value still wins;arrayOfkeepsretry-after-msahead ofx-ms-retry-after-ms, preserving header precedence.Why: Removes a verbatim copy of the millisecond-variant parse block. Precedence is preserved because
the array order matches the original source order and the first match returns immediately.
Build: the loop references the existing
HEADER_RETRY_AFTER_MS/HEADER_X_MS_RETRY_AFTER_MSconstants (not string literals), so neither becomes an orphaned
private constfor detekt to flag. The@Suppress("ReturnCount")onparseis retained; the publicparsesignature is unchanged.