Skip to content

http/pipeline: drop redundant call-state httpClient and de-duplicate reload()/toBlocking() #171

Description

@OmarAlJarrah

While reading through the sdk-core HTTP pipeline runtime (org.dexpace.sdk.core.http.pipeline
the core types, not the steps/ subpackage), I noticed a handful of small, behavior-preserving
cleanups. They sit in the same subsystem and are cheap to land, so I'm grouping them here. None of
them change observable behavior or the public API; a couple just delete code that duplicates logic
already living one call away.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineCallState.kt:27-55 — drop the redundant stored httpClient

PipelineCallState stores an httpClient that is always the same instance as pipeline.httpClient
(it is only ever constructed from HttpPipeline.send, which passes its own httpClient, and from
copy, which forwards it). The single reader can reach it through pipeline instead, so the field
and constructor parameter are dead weight.

Old code:

import org.dexpace.sdk.core.client.HttpClient
import org.dexpace.sdk.core.http.request.Request

/**
 * Per-call mutable cursor over a [HttpPipeline]'s steps array. Holds the index of the
 * next step to invoke, the originating [Request], and a reference to the [HttpClient] used
 * when the cursor reaches the end.
 *
 * Cloned via [copy] (exposed to user code through [PipelineNext.copy]) so retry / redirect
 * steps can re-drive the downstream chain. Cloning copies the current index — the new
 * state resumes from the same position, advancing independently.
 *
 * Backed by an [Array] of steps for tight iteration (per the pipeline performance
 * guardrails); index advance is a single field write.
 *
 * Internal: cloning is reachable only through [PipelineNext.copy].
 */
internal class PipelineCallState internal constructor(
    val pipeline: HttpPipeline,
    initialRequest: Request,
    val httpClient: HttpClient,
    private var index: Int = 0,
) {
    /**
     * The request currently being driven through the pipeline. Mutable so a step may
     * substitute the downstream request via [PipelineNext.process] — for example,
     * [org.dexpace.sdk.core.http.pipeline.steps.InstrumentationStep] wraps the body in
     * `LoggableRequestBody` before send. The substitution propagates to all subsequent
     * steps and to the terminal `HttpClient.execute(...)` when the cursor reaches the end.
     */
    var request: Request = initialRequest

    /**
     * Returns the step at the current cursor position and advances by one. Returns null
     * once the cursor moves past the last step — the caller (`PipelineNext.process`) then
     * dispatches to the terminal [HttpClient]. The cursor only moves forward; re-driving
     * the downstream chain requires [copy] to fork a fresh cursor at the current position.
     */
    fun advance(): HttpStep? {
        val steps = pipeline.stepArray
        return if (index < steps.size) steps[index++] else null
    }

    /** Returns an independent state cloned at the current cursor position. */
    fun copy(): PipelineCallState = PipelineCallState(pipeline, request, httpClient, index)
}

New code:

import org.dexpace.sdk.core.client.HttpClient
import org.dexpace.sdk.core.http.request.Request

/**
 * Per-call mutable cursor over a [HttpPipeline]'s steps array. Holds the index of the
 * next step to invoke and the originating [Request].
 *
 * Cloned via [copy] (exposed to user code through [PipelineNext.copy]) so retry / redirect
 * steps can re-drive the downstream chain. Cloning copies the current index — the new
 * state resumes from the same position, advancing independently.
 *
 * Backed by an [Array] of steps for tight iteration (per the pipeline performance
 * guardrails); index advance is a single field write.
 *
 * Internal: cloning is reachable only through [PipelineNext.copy].
 */
internal class PipelineCallState internal constructor(
    val pipeline: HttpPipeline,
    initialRequest: Request,
    private var index: Int = 0,
) {
    /**
     * The request currently being driven through the pipeline. Mutable so a step may
     * substitute the downstream request via [PipelineNext.process] — for example,
     * [org.dexpace.sdk.core.http.pipeline.steps.InstrumentationStep] wraps the body in
     * `LoggableRequestBody` before send. The substitution propagates to all subsequent
     * steps and to the terminal `HttpClient.execute(...)` when the cursor reaches the end.
     */
    var request: Request = initialRequest

    /**
     * Returns the step at the current cursor position and advances by one. Returns null
     * once the cursor moves past the last step — the caller (`PipelineNext.process`) then
     * dispatches to the terminal [HttpClient]. The cursor only moves forward; re-driving
     * the downstream chain requires [copy] to fork a fresh cursor at the current position.
     */
    fun advance(): HttpStep? {
        val steps = pipeline.stepArray
        return if (index < steps.size) steps[index++] else null
    }

    /** Returns an independent state cloned at the current cursor position. */
    fun copy(): PipelineCallState = PipelineCallState(pipeline, request, index)
}

Usage — before → after:

// PipelineNext.process() — terminal dispatch
return if (nextStep == null) {
    state.httpClient.execute(state.request)
} else {
    nextStep.process(state.request, this)
}

// HttpPipeline.send() — constructs the cursor
val state = PipelineCallState(this, request, httpClient)
// PipelineNext.process() — terminal dispatch
return if (nextStep == null) {
    state.pipeline.httpClient.execute(state.request)
} else {
    nextStep.process(state.request, this)
}

// HttpPipeline.send() — constructs the cursor
val state = PipelineCallState(this, request)

Why: The stored httpClient is provably identical to pipeline.httpClient, so it is a second
handle on the same object; reading it off pipeline removes the field, the constructor parameter,
and one argument from every construction site. The import HttpClient stays — it is still linked
from the advance KDoc.

API / Build: None. PipelineCallState is internal, so there is no apiCheck impact.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineCallState.kt:24-46 — drop the redundant stored httpClient (and its now-unused import)

Same redundancy on the async cursor: httpClient always equals pipeline.httpClient. Removing the
field also makes import AsyncHttpClient unused — the only remaining mention is a non-linking
code span — so it must come out to keep the build clean.

Old code:

import org.dexpace.sdk.core.client.AsyncHttpClient
import org.dexpace.sdk.core.http.request.Request

/**
 * Per-call mutable cursor over an [AsyncHttpPipeline]'s steps array. Async counterpart of
 * [PipelineCallState]: holds the index of the next step to invoke, the in-flight [Request],
 * and a reference to the [AsyncHttpClient] used when the cursor reaches the end.
 *
 * Cloned via [copy] (exposed to user code through [AsyncPipelineNext.copy]) so async retry /
 * redirect steps can re-drive the downstream chain. Cloning copies the current index — the
 * new state resumes from the same position, advancing independently.
 *
 * Internal: cloning is reachable only through [AsyncPipelineNext.copy].
 */
internal class AsyncPipelineCallState internal constructor(
    val pipeline: AsyncHttpPipeline,
    initialRequest: Request,
    val httpClient: AsyncHttpClient,
    private var index: Int = 0,
) {
    /**
     * In-flight request; mutable so a step may substitute the downstream request (e.g.
     * wrap the body in `LoggableRequestBody` before send). The substitution propagates to
     * all subsequent steps and to the terminal `AsyncHttpClient.executeAsync(...)` when
     * the cursor reaches the end of the chain.
     */
    var request: Request = initialRequest

    /** Returns the next step to invoke, or null if the cursor has reached the end. */
    fun advance(): AsyncHttpStep? {
        val steps = pipeline.stepArray
        return if (index < steps.size) steps[index++] else null
    }

    /** Returns an independent state cloned at the current cursor position. */
    fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, httpClient, index)
}

New code:

import org.dexpace.sdk.core.http.request.Request

/**
 * Per-call mutable cursor over an [AsyncHttpPipeline]'s steps array. Async counterpart of
 * [PipelineCallState]: holds the index of the next step to invoke and the in-flight [Request].
 *
 * Cloned via [copy] (exposed to user code through [AsyncPipelineNext.copy]) so async retry /
 * redirect steps can re-drive the downstream chain. Cloning copies the current index — the
 * new state resumes from the same position, advancing independently.
 *
 * Internal: cloning is reachable only through [AsyncPipelineNext.copy].
 */
internal class AsyncPipelineCallState internal constructor(
    val pipeline: AsyncHttpPipeline,
    initialRequest: Request,
    private var index: Int = 0,
) {
    /**
     * In-flight request; mutable so a step may substitute the downstream request (e.g.
     * wrap the body in `LoggableRequestBody` before send). The substitution propagates to
     * all subsequent steps and to the terminal `AsyncHttpClient.executeAsync(...)` when
     * the cursor reaches the end of the chain.
     */
    var request: Request = initialRequest

    /** Returns the next step to invoke, or null if the cursor has reached the end. */
    fun advance(): AsyncHttpStep? {
        val steps = pipeline.stepArray
        return if (index < steps.size) steps[index++] else null
    }

    /** Returns an independent state cloned at the current cursor position. */
    fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, index)
}

Usage — before → after:

// AsyncPipelineNext.processAsync() — terminal dispatch
if (nextStep == null) {
    state.httpClient.executeAsync(state.request)
} else {
    nextStep.processAsync(state.request, this)
}

// AsyncHttpPipeline.sendAsync() — constructs the cursor
val state = AsyncPipelineCallState(this, request, httpClient)
// AsyncPipelineNext.processAsync() — terminal dispatch
if (nextStep == null) {
    state.pipeline.httpClient.executeAsync(state.request)
} else {
    nextStep.processAsync(state.request, this)
}

// AsyncHttpPipeline.sendAsync() — constructs the cursor
val state = AsyncPipelineCallState(this, request)

Why: Mirrors the synchronous cleanup above — the stored httpClient duplicates
pipeline.httpClient, and the single reader reaches it through pipeline just as cheaply.

API / Build: The import org.dexpace.sdk.core.client.AsyncHttpClient removal is required
after dropping the field and the KDoc link, the only surviving reference is a back-ticked code span
(not a symbol link), so the import is unused and allWarningsAsErrors would otherwise break the
build. AsyncPipelineCallState is internal, so there is no apiCheck impact.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/StagedSteps.kt:84-95 — reload() should reuse append()

reload() re-buckets each step with a stage-dispatch loop that is a verbatim copy of append()'s
body. Clearing the storage and replaying through append() keeps the staging policy in one place.

Old code:

    /** Replaces the contents of this storage with [steps], preserving order. */
    fun reload(steps: List<S>) {
        perStage.clear()
        pillars.clear()
        for (s in steps) {
            val stage = stageOf(s)
            if (stage.isPillar) {
                installPillar(s, stage)
            } else {
                perStage.getOrPut(stage) { ArrayDeque() }.addLast(s)
            }
        }
    }

New code:

    /** Replaces the contents of this storage with [steps], preserving order. */
    fun reload(steps: List<S>) {
        perStage.clear()
        pillars.clear()
        steps.forEach(::append)
    }

Usage — before → after:

// HttpPipelineBuilder.from(...) — seeds a builder from an existing pipeline
HttpPipelineBuilder(pipeline.httpClient).also { it.steps.reload(pipeline.steps) }
// HttpPipelineBuilder.from(...) — seeds a builder from an existing pipeline
HttpPipelineBuilder(pipeline.httpClient).also { it.steps.reload(pipeline.steps) }

call sites unchanged — behavior-preserving. append() performs the identical stage dispatch
(installPillar for pillars, tail-addLast otherwise), and reload clears pillars first, so the
pillar-replacement callback never fires while replaying a flattened (one-pillar-per-stage) list.

Why: Removes a duplicated copy of append()'s stage-dispatch loop, leaving a single definition
of how a step is bucketed.

API / Build: None.


sdk-core/.../http/pipeline/HttpPipelineBuilder.kt:114-117 (and AsyncHttpPipelineBuilder.kt:100-103) — List.toTypedArray() over the manual Array(size) { … }

Both build() methods materialize the ordered step list into an array with an index lambda. The
stdlib already has List<T>.toTypedArray() for exactly this.

Old code:

    // HttpPipelineBuilder.build()
    public fun build(): HttpPipeline {
        val ordered = steps.flatten()
        return HttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
    }

    // AsyncHttpPipelineBuilder.build()
    public fun build(): AsyncHttpPipeline {
        val ordered = steps.flatten()
        return AsyncHttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
    }

New code:

    // HttpPipelineBuilder.build()
    public fun build(): HttpPipeline {
        val ordered = steps.flatten()
        return HttpPipeline(httpClient, ordered.toTypedArray())
    }

    // AsyncHttpPipelineBuilder.build()
    public fun build(): AsyncHttpPipeline {
        val ordered = steps.flatten()
        return AsyncHttpPipeline(httpClient, ordered.toTypedArray())
    }

Usage — before → after:

val pipeline = HttpPipelineBuilder(client).append(step).build()
val pipeline = HttpPipelineBuilder(client).append(step).build()

call sites unchanged — behavior-preserving. flatten() returns List<HttpStep> /
List<AsyncHttpStep> (non-null element type), so toTypedArray() yields the exact
Array<HttpStep> / Array<AsyncHttpStep> the constructors expect, including the empty-list case.

Why: toTypedArray() is the idiomatic, allocation-equivalent way to turn a List into a typed
array and reads more clearly than the index-lambda form.

API / Build: None. build()'s public signature is unchanged.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineBridges.kt:123-144 — toBlocking() should delegate to AsyncHttpClient.asBlocking()

toBlocking() hand-rolls a blocking future.get() with interrupt handling
(restore-flag → cancel → InterruptedIOException) and ExecutionException unwrapping. That is
byte-for-byte the contract already implemented by AsyncHttpClient.asBlocking(). Wrapping the
pipeline's sendAsync as an AsyncHttpClient and delegating removes the duplicate.

Old code:

import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.Response
import org.dexpace.sdk.core.util.Futures
import java.io.InterruptedIOException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference

// — HttpPipeline.toAsync, InterruptibleSendFuture, and sendInterruptibly unchanged —

/**
 * Adapts an [AsyncHttpPipeline] into a synchronous [HttpPipeline] by blocking on
 * `sendAsync(request).get()` for each `send(...)` call. The current thread blocks until the
 * future completes; pair with virtual threads (JDK 21+) on the caller side to keep carrier
 * threads available.
 *
 * The blocking wait honours `Thread.interrupt()`: interrupting the calling thread restores the
 * interrupt flag, cancels the in-flight future, and throws an [InterruptedIOException].
 */
public fun AsyncHttpPipeline.toBlocking(): HttpPipeline {
    val async = this
    return HttpPipeline.of { request ->
        val future = async.sendAsync(request)
        try {
            future.get()
        } catch (ie: InterruptedException) {
            // `get()` parks interruptibly (unlike `join()`). Restore the interrupt flag, abort
            // the in-flight send, and surface an InterruptedIOException so the caller's I/O
            // error handling terminates cleanly.
            Thread.currentThread().interrupt()
            future.cancel(true)
            val ioe = InterruptedIOException("Interrupted while waiting for response")
            ioe.initCause(ie)
            throw ioe
        } catch (ee: ExecutionException) {
            // `get()` wraps exceptional completion in ExecutionException; unwrap so callers'
            // `catch (IOException)` sees the original failure rather than the JDK wrapper.
            throw Futures.unwrap(ee)
        }
    }
}

New code:

import org.dexpace.sdk.core.client.AsyncHttpClient
import org.dexpace.sdk.core.client.asBlocking
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.Response
import java.io.InterruptedIOException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference

// — HttpPipeline.toAsync, InterruptibleSendFuture, and sendInterruptibly unchanged —

/**
 * Adapts an [AsyncHttpPipeline] into a synchronous [HttpPipeline] by blocking on
 * `sendAsync(request).get()` for each `send(...)` call. The current thread blocks until the
 * future completes; pair with virtual threads (JDK 21+) on the caller side to keep carrier
 * threads available.
 *
 * The blocking wait honours `Thread.interrupt()`: interrupting the calling thread restores the
 * interrupt flag, cancels the in-flight future, and throws an [InterruptedIOException].
 */
public fun AsyncHttpPipeline.toBlocking(): HttpPipeline =
    HttpPipeline.of(AsyncHttpClient { sendAsync(it) }.asBlocking())

Usage — before → after:

val sync: HttpPipeline = asyncPipeline.toBlocking()
val response = sync.send(request)
val sync: HttpPipeline = asyncPipeline.toBlocking()
val response = sync.send(request)

call sites unchanged — behavior-preserving. asBlocking() performs the identical
interrupt → restore-flag → cancel(true)InterruptedIOException sequence and the same
ExecutionException unwrap via Futures.unwrap, so the blocking semantics are unchanged.

Why: Removes a duplicated blocking-get with interrupt handling that already exists, verbatim, as
AsyncHttpClient.asBlocking() — one place to maintain the cancellation contract instead of two.

API / Build: The import surgery is required. Add import org.dexpace.sdk.core.client.AsyncHttpClient and import org.dexpace.sdk.core.client.asBlocking;
remove the now-orphaned import org.dexpace.sdk.core.util.Futures and import java.util.concurrent.ExecutionException (else allWarningsAsErrors breaks on the unused imports).
Keep import java.io.InterruptedIOException — it survives as the toBlocking KDoc link. The public
toBlocking() signature is unchanged, so there is no apiCheck impact.


sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/Stage.kt:20-21 (and HttpPipelineTest.kt:91) — correct the build-order docs

The @property order KDoc claims order is the "run-order key used by HttpPipelineBuilder.build
to emit steps." It isn't: flatten() walks Stage.entries (declaration order) and never reads the
numeric order. The values happen to ascend in declaration order, which is why the pipeline still
comes out sorted — but the build does not consult order. Documentation-only fix; order stays
(it is a public, stable inspection key).

Old code:

/**
 * Pipeline stage. Lower [order] runs first (closer to caller entry); higher [order] runs
 * later (closer to wire send). Pillar stages hold exactly one step each; non-pillar stages
 * hold an ordered deque of user steps.
 *
 * Sparse [order] values (100s apart) leave room to insert new stages later without
 * renumbering existing ones.
 *
 * @property order Run-order key used by [HttpPipelineBuilder.build] to emit steps; lower
 *   values run first.
 * @property isPillar True if the stage admits at most one step (singleton). False for
 *   user-extensible stages backed by an ordered deque.
 */
// HttpPipelineTest.kt:91
                // Add in reverse-of-stage order to prove the builder sorts by Stage.order.

New code:

/**
 * Pipeline stage. Lower [order] runs first (closer to caller entry); higher [order] runs
 * later (closer to wire send). Pillar stages hold exactly one step each; non-pillar stages
 * hold an ordered deque of user steps.
 *
 * Sparse [order] values (100s apart) leave room to insert new stages later without
 * renumbering existing ones.
 *
 * @property order Stable numeric identity for the stage; ascends with declaration order.
 *   The builder emits steps in declaration order (`Stage.entries`), not by reading this
 *   value — it exists as a stable, sortable inspection key for callers.
 * @property isPillar True if the stage admits at most one step (singleton). False for
 *   user-extensible stages backed by an ordered deque.
 */
// HttpPipelineTest.kt:91
                // Add in reverse declaration order to prove the builder emits in Stage declaration order.

Usage — before → after:

// StagedSteps.flatten() — the actual emission order: declaration order via Stage.entries
fun flatten(): List<S> {
    val out = ArrayList<S>()
    for (stage in Stage.entries) {
        if (stage == Stage.SEND) continue
        if (stage.isPillar) {
            pillars[stage]?.let(out::add)
        } else {
            perStage[stage]?.let(out::addAll)
        }
    }
    return out
}
// Unchanged — shown only to evidence that declaration order (Stage.entries), not the numeric
// `order` value, drives emission.
fun flatten(): List<S> {
    val out = ArrayList<S>()
    for (stage in Stage.entries) {
        if (stage == Stage.SEND) continue
        if (stage.isPillar) {
            pillars[stage]?.let(out::add)
        } else {
            perStage[stage]?.let(out::addAll)
        }
    }
    return out
}

Why: The doc and the test comment both attribute build order to the numeric order, but
flatten() iterates Stage.entries. Correcting them prevents anyone from "fixing" ordering by
renumbering order (which would do nothing) or from reordering the enum entries on the assumption
that order is authoritative.

API / Build: None. order is retained; no signature changes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    sdk-coresdk-core toolkittech-debtsimplification / cleanup

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions