Skip to content

http/pipeline/steps: dedupe retry/auth step helpers and small cleanups #169

Description

@OmarAlJarrah

While tidying the sync/async step pairs under http/pipeline/steps I found several more helper
blocks that have drifted into byte-for-byte duplicates between siblings, plus a couple of small
local cleanups in the same files. These are independent, behavior-preserving, and internal — no
public signature changes, so apiCheck is unaffected.

The largest item in this area — extracting a shared instrumentation emitter from the two
instrumentation steps — is tracked separately in #26; this issue covers the rest.

The items below can land as separate commits. The RetryPolicySupport extraction is the only
structural one and deserves a careful review.

sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt:112-136 — drop the throwaway MDC capture

processAsync initialises mdc with an MdcSnapshot.capture() that is overwritten on
the very first line inside the use { } block, so every async request pays for one MDC
copy that is immediately discarded. A lateinit var expresses the intent (assigned inside
the scope, read after) without the wasted capture.

Old code:

            var mdc: MdcSnapshot = MdcSnapshot.capture() // will be overwritten inside the scope
            val downstream: CompletableFuture<Response> =
                span.makeCurrentWithLoggingContext().use {
                    // Capture after the scope has pushed trace.id / span.id so the snapshot carries them.
                    mdc = MdcSnapshot.capture()
                    emitRequestEvent(outgoing, redactedUrl)
                    try {
                        next.processAsync(outgoing)
                    } catch (e: Throwable) {
                        // Synchronous throw from the next step (e.g. argument validation).
                        // Normalise to a failed future so callers get the uniform async contract.
                        val elapsedMs = elapsedMillis(startNanos)
                        emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody)
                        recordMetrics(
                            request,
                            statusCode = -1,
                            elapsedMs,
                            errorType = e::class.java.simpleName ?: "Throwable",
                        )
                        span.end(e)
                        return Futures.failed(e)
                    }
                }

            val capturedMdc = mdc

New code:

            lateinit var mdc: MdcSnapshot
            val downstream: CompletableFuture<Response> =
                span.makeCurrentWithLoggingContext().use {
                    // Capture after the scope has pushed trace.id / span.id so the snapshot carries them.
                    mdc = MdcSnapshot.capture()
                    emitRequestEvent(outgoing, redactedUrl)
                    try {
                        next.processAsync(outgoing)
                    } catch (e: Throwable) {
                        // Synchronous throw from the next step (e.g. argument validation).
                        // Normalise to a failed future so callers get the uniform async contract.
                        val elapsedMs = elapsedMillis(startNanos)
                        emitFailureEvent(outgoing, redactedUrl, e, elapsedMs, wrappedRequestBody)
                        recordMetrics(
                            request,
                            statusCode = -1,
                            elapsedMs,
                            errorType = e::class.java.simpleName ?: "Throwable",
                        )
                        span.end(e)
                        return Futures.failed(e)
                    }
                }

            val capturedMdc = mdc

Usage — before → after: call sites unchanged — behavior-preserving. mdc is assigned
on the first line of the use { } block (before any early return reads it), so the read
below is always initialised:

val capturedMdc = mdc
return downstream.handle { raw, err -> handleCompletion(span, capturedMdc, raw, err, /* ... */) }
val capturedMdc = mdc
return downstream.handle { raw, err -> handleCompletion(span, capturedMdc, raw, err, /* ... */) }

Why: avoids one wasted MdcSnapshot copy/allocation per async request; lateinit
also documents that the real capture happens inside the trace-context scope (so the
snapshot carries trace.id / span.id), which the throwaway initializer obscured.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt:408-544 — extract RetryPolicySupport for the shared stateless policy

DefaultAsyncRetryStep and DefaultRetryStep share the entire retry policy: options
clamping, the RetrySettings backoff view, re-sendability gating, interrupt
normalisation, predicate invocation, the caller delay override, and fixed/exponential
backoff. Only the per-call loop machinery and the terminal-failure paths genuinely differ.
Pull the stateless policy into one internal class RetryPolicySupport(rawOptions, clock, logger) that clamps options and builds backoffSettings once and exposes the shared
helpers; both steps hold a support and delegate.

Deliberately kept on each step (they differ): the per-call shouldRetryResponse /
shouldRetryException (they read tryCount / suppressed), the distinct terminal-completion
paths, the protected open delay hooks (computeResponseDelay / computeExceptionDelay /
retryAfterFromHeaders — part of the subclassing contract), and closeQuietly with its
deliberately different catch width (sync swallows IOException, async swallows Exception).

This is the structural item — review carefully.

Old code: (DefaultAsyncRetryStep.kt lines 408-544; DefaultRetryStep.kt carries the
same helpers at lines 294-597 with only the closeQuietly catch width differing)

        // --------------- Shared helpers (stateless across calls) ---------------

        private fun isRetrySafe(request: Request): Boolean {
            val body = request.body ?: return request.method in IDEMPOTENT_METHODS
            return body.isReplayable()
        }

        /**
         * Normalises an interrupt-signalling exception to [InterruptedIOException]: an
         * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with
         * the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name.
         */
        private fun asInterruptedIo(exception: Exception): InterruptedIOException =
            when (exception) {
                is InterruptedIOException -> exception
                else -> InterruptedIOException("retry interrupted").apply { initCause(exception) }
            }

        private fun invokeShouldRetry(
            predicate: HttpRetryConditionPredicate,
            condition: HttpRetryCondition,
        ): Boolean =
            try {
                predicate.shouldRetry(condition)
            } catch (t: Throwable) {
                @Suppress("InstanceOfCheckForException")
                if (t is Error) throw t
                throw IllegalStateException("shouldRetry predicate threw", t)
            }

        private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? =
            try {
                options.delayFromCondition.delayFor(condition)
            } catch (t: Throwable) {
                @Suppress("InstanceOfCheckForException")
                if (t is Error) throw t
                logger.atWarning()
                    .event("http.retry.delay_override_failed")
                    .field("error.type", t::class.java.simpleName ?: "Throwable")
                    .cause(t)
                    .log()
                null
            }

        private fun backoffOrFixed(tryCount: Int): Duration =
            options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings)

        // --------------- Delay computation (subclass extension points) ---------------

        /**
         * Computes the delay before retrying [condition]'s response. Resolution order mirrors
         * [DefaultRetryStep.computeResponseDelay]:
         *  1. [HttpRetryOptions.delayFromCondition] override (if it returns non-null).
         *  2. `Retry-After` header parsing ([retryAfterFromHeaders]).
         *  3. [HttpRetryOptions.fixedDelay] or exponential backoff.
         *
         * `protected open` so a subclass can apply request-specific delay logic, exactly as the
         * synchronous [DefaultRetryStep] allows. An override MUST return a non-negative [Duration]
         * and MUST NOT throw: a throw aborts the call (the open retryable response is closed first
         * by the loop's close-on-throw guard).
         */
        protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
            invokeDelayFromCondition(condition)?.let { return it }
            condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
            return backoffOrFixed(condition.tryCount)
        }

        /**
         * Computes the delay before retrying [condition]'s exception. Like [computeResponseDelay]
         * but skips header parsing (there is no response to read headers from). `protected open`
         * with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay].
         */
        protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration {
            invokeDelayFromCondition(condition)?.let { return it }
            return backoffOrFixed(condition.tryCount)
        }

        /**
         * Walks [HttpRetryOptions.retryAfterHeaders] in order, returning the first parseable delay.
         * `protected open` so a subclass can support additional server-specific pacing headers,
         * mirroring [DefaultRetryStep.retryAfterFromHeaders]. May return `null` to fall through to
         * the default backoff; must not throw.
         */
        protected open fun retryAfterFromHeaders(response: Response): Duration? {
            val now = clock.now()
            for (name in options.retryAfterHeaders) {
                val raw = response.headers.get(name) ?: continue
                RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it }
            }
            return null
        }

        private fun closeQuietly(response: Response) {
            try {
                response.close()
            } catch (closeErr: Exception) {
                // Swallow ANY close failure (not just IOException) on a response being discarded
                // before a retry: it is not actionable, and on the async path an escaping throw
                // would be swallowed by the whenComplete callback and strand the returned future.
                // Only [Error] (OOM, StackOverflow) propagates — those are JVM-fatal, not ours to
                // recover. (The sync DefaultRetryStep lets a non-IOException close failure surface
                // as a terminal error; async cannot, since that path has no caller to throw to.)
                logger.atVerbose()
                    .event("http.retry.close_failed")
                    .field("error.type", closeErr::class.java.simpleName ?: "Exception")
                    .log()
            }
        }

        private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
            if (opts.maxRetries >= 0) return opts
            logger.atVerbose()
                .event("http.retry.maxRetries_clamped")
                .field("http.retry.max_retries.requested", opts.maxRetries.toLong())
                .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong())
                .log()
            return HttpRetryOptions(
                maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES,
                baseDelay = opts.baseDelay,
                maxDelay = opts.maxDelay,
                fixedDelay = opts.fixedDelay,
                retryAfterHeaders = opts.retryAfterHeaders,
                shouldRetryCondition = opts.shouldRetryCondition,
                shouldRetryException = opts.shouldRetryException,
                delayFromCondition = opts.delayFromCondition,
            )
        }

        public companion object {
            // Nanoseconds in one millisecond — converts monotonic deltas to ms for log events.
            private const val NANOS_PER_MILLI = 1_000_000L

            // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110).
            // Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS.
            private val IDEMPOTENT_METHODS: Set<Method> =
                setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE)
        }

New code: the shared stateless policy in one file. The protected open delay hooks and
closeQuietly stay on each step and delegate into support (shown under Usage). NANOS_PER_MILLI
stays in each step's companion (used by the per-call logRetry).

/*
 * Copyright (c) 2026 dexpace and Omar Aljarrah
 *
 * Licensed under the MIT License. See LICENSE in the project root.
 * SPDX-License-Identifier: MIT
 */

package org.dexpace.sdk.core.http.pipeline.steps

import org.dexpace.sdk.core.http.request.Method
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.instrumentation.ClientLogger
import org.dexpace.sdk.core.pipeline.step.retry.BackoffCalculator
import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings
import org.dexpace.sdk.core.util.Clock
import java.io.InterruptedIOException
import java.time.Duration

/**
 * Stateless retry policy shared by [DefaultRetryStep] and [DefaultAsyncRetryStep]. Clamps the
 * caller's [HttpRetryOptions] (a negative `maxRetries` becomes
 * [DefaultRetryStep.DEFAULT_MAX_RETRIES]) and builds the [RetrySettings] backoff view once, then
 * exposes the policy helpers both drivers share: re-sendability gating, interrupt normalisation,
 * predicate invocation, the caller delay override, and fixed-or-exponential backoff.
 *
 * Per-call state (try count, suppressed trail, terminal completion), the `protected open` delay
 * hooks, and the close-on-discard helper stay on each step — those differ between the blocking and
 * async stacks. Holds no mutable state after construction, so it is safe to share across
 * concurrent calls.
 */
internal class RetryPolicySupport(
    rawOptions: HttpRetryOptions,
    private val clock: Clock,
    private val logger: ClientLogger,
) {
    /** Effective options; `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */
    val options: HttpRetryOptions = clampOptions(rawOptions)

    /**
     * The [options]' exponential parameters as a [RetrySettings] view so the shared
     * [BackoffCalculator] computes this stack's schedule. Built once; `totalTimeout = ZERO`
     * disables the deadline cap. Building it eagerly validates the delay magnitudes.
     */
    val backoffSettings: RetrySettings =
        RetrySettings.builder()
            .initialDelay(options.baseDelay)
            .maxDelay(options.maxDelay)
            .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER)
            .jitter(RetrySettings.DEFAULT_JITTER)
            .totalTimeout(Duration.ZERO)
            .build()

    /**
     * Returns `true` when [request] may be re-sent: a body-less request only when its method is
     * idempotent ([IDEMPOTENT_METHODS]); a body-bearing request only when its body is replayable.
     */
    fun isRetrySafe(request: Request): Boolean {
        val body = request.body ?: return request.method in IDEMPOTENT_METHODS
        return body.isReplayable()
    }

    /**
     * Normalises an interrupt-signalling exception to [InterruptedIOException]: an
     * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with the
     * original attached as its cause.
     */
    fun asInterruptedIo(exception: Exception): InterruptedIOException =
        when (exception) {
            is InterruptedIOException -> exception
            else -> InterruptedIOException("retry interrupted").apply { initCause(exception) }
        }

    fun invokeShouldRetry(
        predicate: HttpRetryConditionPredicate,
        condition: HttpRetryCondition,
    ): Boolean =
        try {
            predicate.shouldRetry(condition)
        } catch (t: Throwable) {
            // Error subclasses still rethrown; an OOM in the predicate must not be wrapped.
            // Splitting Error from RuntimeException via `is` is the canonical JVM idiom for
            // retry classification — there is no other way to distinguish JVM Errors here.
            @Suppress("InstanceOfCheckForException")
            if (t is Error) throw t
            throw IllegalStateException("shouldRetry predicate threw", t)
        }

    fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? =
        try {
            options.delayFromCondition.delayFor(condition)
        } catch (t: Throwable) {
            @Suppress("InstanceOfCheckForException")
            if (t is Error) throw t
            // Don't fail the whole pipeline if the user override misbehaves — fall back to the
            // default delay calculation. Log loud enough that the bug is observable.
            logger.atWarning()
                .event("http.retry.delay_override_failed")
                .field("error.type", t::class.java.simpleName ?: "Throwable")
                .cause(t)
                .log()
            null
        }

    fun backoffOrFixed(tryCount: Int): Duration =
        options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings)

    // The eight-field rebuild below is itself collapsed to `opts.withMaxRetries(...)` by the
    // HttpRetryOptions cleanup further down this issue; shown here as-is for a faithful relocation.
    private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
        if (opts.maxRetries >= 0) return opts
        logger.atVerbose()
            .event("http.retry.maxRetries_clamped")
            .field("http.retry.max_retries.requested", opts.maxRetries.toLong())
            .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong())
            .log()
        return HttpRetryOptions(
            maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES,
            baseDelay = opts.baseDelay,
            maxDelay = opts.maxDelay,
            fixedDelay = opts.fixedDelay,
            retryAfterHeaders = opts.retryAfterHeaders,
            shouldRetryCondition = opts.shouldRetryCondition,
            shouldRetryException = opts.shouldRetryException,
            delayFromCondition = opts.delayFromCondition,
        )
    }

    private companion object {
        // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110).
        // Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS.
        private val IDEMPOTENT_METHODS: Set<Method> =
            setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE)
    }
}

Usage — before → after: the step holds one support instead of options +
backoffSettings + the five private helpers, and the protected open hooks delegate:

// DefaultAsyncRetryStep — before
private val options: HttpRetryOptions = clampOptions(options)
private val backoffSettings: RetrySettings = RetrySettings.builder()/* ... */.build()
// ...
val driver = RetryDriver(next, isRetrySafe(request), result)
// ... inside the driver / hooks:
val retry = retrySafe && tryCount < options.maxRetries && shouldRetryResponse(response)
return invokeShouldRetry(options.shouldRetryCondition, condition)
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
    invokeDelayFromCondition(condition)?.let { return it }
    condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
    return backoffOrFixed(condition.tryCount)
}
// DefaultAsyncRetryStep — after
private val support = RetryPolicySupport(options, clock, logger)
// ...
val driver = RetryDriver(next, support.isRetrySafe(request), result)
// ... inside the driver / hooks:
val retry = retrySafe && tryCount < support.options.maxRetries && shouldRetryResponse(response)
return support.invokeShouldRetry(support.options.shouldRetryCondition, condition)
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
    support.invokeDelayFromCondition(condition)?.let { return it }
    condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
    return support.backoffOrFixed(condition.tryCount)
}
// retryAfterFromHeaders / closeQuietly stay on the step; closeQuietly keeps its async-wide
// `catch (closeErr: Exception)` (sync keeps `catch (closeErr: IOException)`).

Why: the retry decision/backoff logic lives in one place, so a change to clamping,
backoff settings, or predicate-exception handling can't silently apply to only one of the
two stacks; each step shrinks to its genuinely-distinct loop and terminal paths.

Build: new file needs the MIT header. BackoffCalculator, RetrySettings, and
Method imports move from both steps into RetryPolicySupport (drop them from the steps or
allWarningsAsErrors fails); each step keeps InterruptedIOException (the driver still
type-checks is InterruptedIOException) and RetryAfterParser (used by the retained
retryAfterFromHeaders). Internal only — DefaultRetryStep.DEFAULT_MAX_RETRIES stays
public, so no apiCheck impact.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt:258-291 — hoist offersBearerChallenge to one top-level function

Both bearer steps define a private offersBearerChallenge(response) and a private
"bearer" scheme constant with identical bodies. Hoist them to a single top-level
internal fun + internal const in BearerTokenAuthStep.kt (mirroring how
defaultShouldRetryResponse already lives top-level in HttpRetryOptions.kt) and call it
from both. evictRejectedToken stays duplicated — it touches each step's own lock /
cachedToken and isn't shareable as-is.

Old code: (AsyncBearerTokenAuthStep.kt lines 258-291, plus the sync duplicates it
mirrors)

        // ── AsyncBearerTokenAuthStep.kt:258-291 ──
        /**
         * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer`
         * challenge.
         */
        private fun offersBearerChallenge(response: Response): Boolean {
            val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
            return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
        }

        /**
         * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
         * Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against
         * a concurrent refresh.
         */
        private fun evictRejectedToken(rejectedHeader: String) {
            lock.withLock {
                val current = cachedToken ?: return
                if (bearerHeaderValue(current.token) == rejectedHeader) {
                    cachedToken = null
                }
            }
        }

        /**
         * The `Authorization` header value for [token]. Single source of truth shared by the
         * stamping path and [evictRejectedToken]. A subclass that emits a different header format
         * must override this too, or eviction stops matching.
         */
        protected open fun bearerHeaderValue(token: String): String = "Bearer $token"

        private companion object {
            private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
            private const val BEARER_SCHEME = "bearer"
        }
    }

        // ── BearerTokenAuthStep.kt:148-151 + 223-231 — the sync duplicates ──
        private fun offersBearerChallenge(response: Response): Boolean {
            val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
            return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
        }
        // ...
        private companion object {
            // Default refresh margin: refresh the bearer token 30 seconds before its expiry
            // so an in-flight request never carries a near-expired credential.
            private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L

            // Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower
            // case, so the eviction gate compares against this constant.
            private const val BEARER_SCHEME = "bearer"
        }

New code: one top-level definition; both companions lose BEARER_SCHEME, the async
file loses its private function and its now-orphaned AuthChallengeParser import.

// ── BearerTokenAuthStep.kt — new top-level definitions (after the class), at file scope ──
/**
 * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` challenge. A
 * header with only non-bearer challenges (or one that does not parse) returns `false`. The
 * [AuthStep] / [AsyncAuthStep] pillar guarantees the header is present before the challenge hook
 * runs; the explicit null-guard keeps the function correct if called from elsewhere. Shared by
 * [BearerTokenAuthStep] and [AsyncBearerTokenAuthStep].
 */
internal fun offersBearerChallenge(response: Response): Boolean {
    val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
    return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}

// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower case, so the
// eviction gate compares against this constant.
internal const val BEARER_SCHEME: String = "bearer"

// ── BearerTokenAuthStep.kt — the class loses its private offersBearerChallenge; companion is now ──
        private companion object {
            // Default refresh margin: refresh the bearer token 30 seconds before its expiry
            // so an in-flight request never carries a near-expired credential.
            private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
        }

// ── AsyncBearerTokenAuthStep.kt:258-291 — function removed, BEARER_SCHEME removed ──
        /**
         * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
         * Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against
         * a concurrent refresh.
         */
        private fun evictRejectedToken(rejectedHeader: String) {
            lock.withLock {
                val current = cachedToken ?: return
                if (bearerHeaderValue(current.token) == rejectedHeader) {
                    cachedToken = null
                }
            }
        }

        /**
         * The `Authorization` header value for [token]. Single source of truth shared by the
         * stamping path and [evictRejectedToken]. A subclass that emits a different header format
         * must override this too, or eviction stops matching.
         */
        protected open fun bearerHeaderValue(token: String): String = "Bearer $token"

        private companion object {
            private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
        }
    }

// ── AsyncBearerTokenAuthStep.kt:10 — delete this now-orphaned import ──
// import org.dexpace.sdk.core.http.auth.AuthChallengeParser

Usage — before → after: call sites unchanged — behavior-preserving. The unqualified
call in each challenge hook now resolves to the top-level function:

// AsyncBearerTokenAuthStep.authorizeRequestOnChallengeAsync / BearerTokenAuthStep.authorizeRequestOnChallenge
if (!offersBearerChallenge(response)) return CompletableFuture.completedFuture(null)
// AsyncBearerTokenAuthStep.authorizeRequestOnChallengeAsync / BearerTokenAuthStep.authorizeRequestOnChallenge
if (!offersBearerChallenge(response)) return CompletableFuture.completedFuture(null)

Why: one definition of the bearer-challenge check instead of two copies that must be
edited together; BEARER_SCHEME lives next to the only function that reads it.

Build: removing the orphaned import org.dexpace.sdk.core.http.auth.AuthChallengeParser
from AsyncBearerTokenAuthStep.kt is requiredallWarningsAsErrors fails on the
unused import once the function moves. BearerTokenAuthStep.kt keeps the import (the
top-level function now uses it). Internal only — no apiCheck impact.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt:86-119 — fold the HookOutcome carrier into a single handle

handleChallenge runs the challenge future through a handle that does nothing but pack
its (value, error) into a private HookOutcome, then a thenCompose that unpacks it and
does the real work. The carrier and the extra composition stage are unnecessary — do the
close/dispatch decision inside a single handle whose result is flattened with
thenCompose { it }.

Old code:

    private fun handleChallenge(
        authorized: Request,
        response: Response,
        next: AsyncPipelineNext,
    ): CompletableFuture<Response> {
        if (response.status.code != SC_UNAUTHORIZED) return CompletableFuture.completedFuture(response)
        response.headers.get(HttpHeaderName.WWW_AUTHENTICATE)
            ?: return CompletableFuture.completedFuture(response)

        val challengeFuture: CompletableFuture<Request?> =
            try {
                authorizeRequestOnChallengeAsync(authorized, response)
            } catch (t: Throwable) {
                // A sync throw from the hook (caller-bug case) must still close the 401 body.
                response.close()
                return Futures.failed(t)
            }

        return challengeFuture.handle { retryRequest, hookError ->
            HookOutcome(retryRequest, hookError)
        }.thenCompose { outcome ->
            val hookError = outcome.error
            if (hookError != null) {
                response.close()
                return@thenCompose Futures.failed<Response>(Futures.unwrap(hookError))
            }
            val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response)
            response.close()
            next.copy().processAsync(retryRequest)
        }
    }

    /** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */
    private class HookOutcome(val request: Request?, val error: Throwable?)

New code:

    private fun handleChallenge(
        authorized: Request,
        response: Response,
        next: AsyncPipelineNext,
    ): CompletableFuture<Response> {
        if (response.status.code != SC_UNAUTHORIZED) return CompletableFuture.completedFuture(response)
        response.headers.get(HttpHeaderName.WWW_AUTHENTICATE)
            ?: return CompletableFuture.completedFuture(response)

        val challengeFuture: CompletableFuture<Request?> =
            try {
                authorizeRequestOnChallengeAsync(authorized, response)
            } catch (t: Throwable) {
                // A sync throw from the hook (caller-bug case) must still close the 401 body.
                response.close()
                return Futures.failed(t)
            }

        return challengeFuture.handle<CompletableFuture<Response>> { retryRequest, hookError ->
            if (hookError != null) {
                response.close()
                return@handle Futures.failed<Response>(Futures.unwrap(hookError))
            }
            if (retryRequest == null) return@handle CompletableFuture.completedFuture(response)
            response.close()
            next.copy().processAsync(retryRequest)
        }.thenCompose { it }
    }

Usage — before → after: call sites unchanged — behavior-preserving. processAsync
composes handleChallenge exactly as before:

return authorizedFuture.thenCompose { authorized ->
    next.copy().processAsync(authorized).thenCompose { response ->
        handleChallenge(authorized, response, next)
    }
}
return authorizedFuture.thenCompose { authorized ->
    next.copy().processAsync(authorized).thenCompose { response ->
        handleChallenge(authorized, response, next)
    }
}

Why: removes a one-field carrier type and a whole composition stage; the handle
lambda decides close-and-fail / pass-through / retry directly, and the explicit
null-check smart-casts retryRequest to non-null for the dispatch. Behaviour is
identical — the original handle did no work beyond boxing the outcome.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt:69-119 — add an internal withMaxRetries copy helper

HttpRetryOptions isn't a data class, so the negative-maxRetries clamp in both retry
steps hand-rebuilds all eight fields through the constructor. Add one internal
withMaxRetries that does the copy, and let both clampOptions call it.

Old code:

public class HttpRetryOptions
    @JvmOverloads
    constructor(
        public val maxRetries: Int = DEFAULT_MAX_RETRIES,
        public val baseDelay: Duration = RetrySettings.DEFAULT_INITIAL_DELAY,
        public val maxDelay: Duration = RetrySettings.DEFAULT_MAX_DELAY,
        public val fixedDelay: Duration? = null,
        public val retryAfterHeaders: List<HttpHeaderName> = DEFAULT_RETRY_AFTER_HEADERS,
        public val shouldRetryCondition: HttpRetryConditionPredicate =
            HttpRetryConditionPredicate(::defaultShouldRetryResponse),
        public val shouldRetryException: HttpRetryConditionPredicate =
            HttpRetryConditionPredicate(::defaultShouldRetryException),
        public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null },
    ) {
        public companion object {
            // The default retry count is the canonical SDK budget, kept in one place on
            // DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS).
            private const val DEFAULT_MAX_RETRIES = DefaultRetryStep.DEFAULT_MAX_RETRIES

            /**
             * The three `Retry-After` header forms parsed by [DefaultRetryStep]. Order matters —
             * the first header present on the response wins.
             */
            @JvmField
            public val DEFAULT_RETRY_AFTER_HEADERS: List<HttpHeaderName> =
                Collections.unmodifiableList(
                    listOf(
                        HttpHeaderName.RETRY_AFTER,
                        HttpHeaderName.RETRY_AFTER_MS,
                        HttpHeaderName.X_MS_RETRY_AFTER_MS,
                    ),
                )

            /**
             * Returns an [HttpRetryOptions] that uses a flat [delay] between every retry — no
             * exponential growth, no jitter. [baseDelay] and [maxDelay] are forced to zero
             * so the backoff path is unreachable.
             */
            @JvmStatic
            public fun fixed(
                maxRetries: Int,
                delay: Duration,
            ): HttpRetryOptions =
                HttpRetryOptions(
                    maxRetries = maxRetries,
                    fixedDelay = delay,
                    baseDelay = Duration.ZERO,
                    maxDelay = Duration.ZERO,
                )
        }
    }

New code:

public class HttpRetryOptions
    @JvmOverloads
    constructor(
        public val maxRetries: Int = DEFAULT_MAX_RETRIES,
        public val baseDelay: Duration = RetrySettings.DEFAULT_INITIAL_DELAY,
        public val maxDelay: Duration = RetrySettings.DEFAULT_MAX_DELAY,
        public val fixedDelay: Duration? = null,
        public val retryAfterHeaders: List<HttpHeaderName> = DEFAULT_RETRY_AFTER_HEADERS,
        public val shouldRetryCondition: HttpRetryConditionPredicate =
            HttpRetryConditionPredicate(::defaultShouldRetryResponse),
        public val shouldRetryException: HttpRetryConditionPredicate =
            HttpRetryConditionPredicate(::defaultShouldRetryException),
        public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null },
    ) {
        /**
         * Returns a copy of these options with [maxRetries] replaced and every other field
         * preserved. Used by the retry steps to apply the negative-`maxRetries` clamp without
         * hand-rebuilding all eight fields at each call site.
         */
        internal fun withMaxRetries(maxRetries: Int): HttpRetryOptions =
            HttpRetryOptions(
                maxRetries = maxRetries,
                baseDelay = baseDelay,
                maxDelay = maxDelay,
                fixedDelay = fixedDelay,
                retryAfterHeaders = retryAfterHeaders,
                shouldRetryCondition = shouldRetryCondition,
                shouldRetryException = shouldRetryException,
                delayFromCondition = delayFromCondition,
            )

        public companion object {
            // The default retry count is the canonical SDK budget, kept in one place on
            // DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS).
            private const val DEFAULT_MAX_RETRIES = DefaultRetryStep.DEFAULT_MAX_RETRIES

            /**
             * The three `Retry-After` header forms parsed by [DefaultRetryStep]. Order matters —
             * the first header present on the response wins.
             */
            @JvmField
            public val DEFAULT_RETRY_AFTER_HEADERS: List<HttpHeaderName> =
                Collections.unmodifiableList(
                    listOf(
                        HttpHeaderName.RETRY_AFTER,
                        HttpHeaderName.RETRY_AFTER_MS,
                        HttpHeaderName.X_MS_RETRY_AFTER_MS,
                    ),
                )

            /**
             * Returns an [HttpRetryOptions] that uses a flat [delay] between every retry — no
             * exponential growth, no jitter. [baseDelay] and [maxDelay] are forced to zero
             * so the backoff path is unreachable.
             */
            @JvmStatic
            public fun fixed(
                maxRetries: Int,
                delay: Duration,
            ): HttpRetryOptions =
                HttpRetryOptions(
                    maxRetries = maxRetries,
                    fixedDelay = delay,
                    baseDelay = Duration.ZERO,
                    maxDelay = Duration.ZERO,
                )
        }
    }

Usage — before → after: clampOptions in both retry steps collapses to one call. The
sync step references the bare DEFAULT_MAX_RETRIES (its own companion); the async step must
qualify DefaultRetryStep.DEFAULT_MAX_RETRIES.

// DefaultRetryStep.clampOptions — before
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
    if (opts.maxRetries >= 0) return opts
    logger.atVerbose()
        .event("http.retry.maxRetries_clamped")
        .field("http.retry.max_retries.requested", opts.maxRetries.toLong())
        .field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong())
        .log()
    // HttpRetryOptions isn't a data class, so an explicit copy via the constructor is
    // the cheapest correct fix — re-using every other field as-is.
    return HttpRetryOptions(
        maxRetries = DEFAULT_MAX_RETRIES,
        baseDelay = opts.baseDelay,
        maxDelay = opts.maxDelay,
        fixedDelay = opts.fixedDelay,
        retryAfterHeaders = opts.retryAfterHeaders,
        shouldRetryCondition = opts.shouldRetryCondition,
        shouldRetryException = opts.shouldRetryException,
        delayFromCondition = opts.delayFromCondition,
    )
}
// DefaultRetryStep.clampOptions — after
private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
    if (opts.maxRetries >= 0) return opts
    logger.atVerbose()
        .event("http.retry.maxRetries_clamped")
        .field("http.retry.max_retries.requested", opts.maxRetries.toLong())
        .field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong())
        .log()
    return opts.withMaxRetries(DEFAULT_MAX_RETRIES)
}

// DefaultAsyncRetryStep.clampOptions — after (qualified constant)
//     return opts.withMaxRetries(DefaultRetryStep.DEFAULT_MAX_RETRIES)
// If the RetryPolicySupport extraction above lands, this same collapse applies inside
// RetryPolicySupport.clampOptions.

Why: the eight-field copy lives once on the type that owns the fields, so adding a
ninth option later can't leave a stale hand-rebuild that silently drops it in the clamp
path; each clampOptions shrinks to a single readable line.

Build: the one-line constructor call exceeds ktlint's 120-column limit, so the named
arguments are wrapped one-per-line. withMaxRetries is internal, so it does not appear
in the .api snapshots — no apiCheck impact.

Metadata

Metadata

Assignees

No one assigned

    Labels

    sdk-coresdk-core toolkittech-debtsimplification / cleanup

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions