diff --git a/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt b/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt index f86f6bdd..fe9db629 100644 --- a/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt +++ b/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt @@ -20,6 +20,7 @@ import org.dexpace.sdk.core.instrumentation.MdcSnapshot import org.dexpace.sdk.core.io.BufferedSource import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.CompletableFuture private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor") @@ -43,27 +44,7 @@ private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor") * To extend MDC propagation through user-supplied downstream operators, enable * `Hooks.enableAutomaticContextPropagation()` at the application level. */ -public fun AsyncHttpClient.executeMono(request: Request): Mono = - Mono.defer { - val mdc = MdcSnapshot.capture() - Mono.fromFuture { mdc.withMdc { executeAsync(request) } } - .doOnSubscribe { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.subscribed") - .field("adapter.type", "reactor") - .log() - } - } - .doOnCancel { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.cancel_propagated") - .field("adapter.type", "reactor") - .log() - } - } - } +public fun AsyncHttpClient.executeMono(request: Request): Mono = deferMono { executeAsync(request) } /** * Pipeline-level [Mono] facade — see [executeMono]. @@ -79,27 +60,33 @@ public fun AsyncHttpClient.executeMono(request: Request): Mono = * To extend MDC propagation through user-supplied downstream operators, enable * `Hooks.enableAutomaticContextPropagation()` at the application level. */ -public fun AsyncHttpPipeline.sendMono(request: Request): Mono = +public fun AsyncHttpPipeline.sendMono(request: Request): Mono = deferMono { sendAsync(request) } + +/** + * Bridges a [CompletableFuture]-returning [supplier] to a cold [Mono]. Each subscription runs + * the supplier afresh under [Mono.defer], captures the subscriber's MDC, and reinstates it for + * the future-producing call and for both lifecycle log hooks. Cancelling the subscription + * cancels the underlying future through [Mono.fromFuture]. + */ +private fun deferMono(supplier: () -> CompletableFuture): Mono = Mono.defer { val mdc = MdcSnapshot.capture() - Mono.fromFuture { mdc.withMdc { sendAsync(request) } } - .doOnSubscribe { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.subscribed") - .field("adapter.type", "reactor") - .log() - } - } - .doOnCancel { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.cancel_propagated") - .field("adapter.type", "reactor") - .log() - } - } + Mono.fromFuture { mdc.withMdc { supplier() } } + .doOnSubscribe { logEvent(mdc, "async.adapter.subscribed") } + .doOnCancel { logEvent(mdc, "async.adapter.cancel_propagated") } + } + +private fun logEvent( + mdc: MdcSnapshot, + event: String, +) { + mdc.withMdc { + log.atVerbose() + .event(event) + .field("adapter.type", "reactor") + .log() } +} /** * Exposes the SSE event stream as a Reactor [Flux]. Backpressure is honored via diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt index e374d94b..2eb559f7 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt @@ -22,23 +22,13 @@ import org.dexpace.sdk.core.io.Sink * wrapper field is read/written without synchronization. */ internal class ForeignSinkAdapter(private val delegate: Sink) : okio.Sink { - private var cachedBuffer: okio.Buffer? = null - private var cachedWrapper: OkioBuffer? = null + private val wrappers = OkioBufferWrapperCache() override fun write( source: okio.Buffer, byteCount: Long, ) { - // Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio - // passes us. Okio reuses the same source for a buffered producer's lifetime, so this - // amortizes wrapper allocation to once per producer. - val wrapper = - cachedWrapper.takeIf { source === cachedBuffer } - ?: OkioBuffer(source).also { - cachedBuffer = source - cachedWrapper = it - } - delegate.write(wrapper, byteCount) + delegate.write(wrappers.wrap(source), byteCount) } override fun flush() { diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt index 98eed474..423a9065 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt @@ -23,24 +23,12 @@ import org.dexpace.sdk.core.io.Source * wrapper field is read/written without synchronization. */ internal class ForeignSourceAdapter(private val delegate: Source) : okio.Source { - private var cachedBuffer: okio.Buffer? = null - private var cachedWrapper: OkioBuffer? = null + private val wrappers = OkioBufferWrapperCache() override fun read( sink: okio.Buffer, byteCount: Long, - ): Long { - // Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio - // passes us. Okio reuses the same sink for a buffered consumer's lifetime, so this - // amortizes wrapper allocation to once per consumer. - val wrapper = - cachedWrapper.takeIf { sink === cachedBuffer } - ?: OkioBuffer(sink).also { - cachedBuffer = sink - cachedWrapper = it - } - return delegate.read(wrapper, byteCount) - } + ): Long = delegate.read(wrappers.wrap(sink), byteCount) override fun timeout(): okio.Timeout = okio.Timeout.NONE diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt new file mode 100644 index 00000000..747ff3c9 --- /dev/null +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.io.internal + +/** + * Caches a single [OkioBuffer] wrapper keyed by the reference identity of the [okio.Buffer] it + * adapts. Okio hands a buffered consumer (or producer) the same buffer instance for its whole + * lifetime, so wrapping it once and reusing the wrapper amortizes allocation to once per + * consumer instead of once per chunk. + * + * ## Thread-safety + * + * Not safe for concurrent use — the cached fields are read/written without synchronization, + * matching the single-threaded contract of the Okio buffered source/sink that owns the cache. + */ +internal class OkioBufferWrapperCache { + private var cachedBuffer: okio.Buffer? = null + private var cachedWrapper: OkioBuffer? = null + + fun wrap(buffer: okio.Buffer): OkioBuffer = + cachedWrapper.takeIf { buffer === cachedBuffer } + ?: OkioBuffer(buffer).also { + cachedBuffer = buffer + cachedWrapper = it + } +} diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt index f482f606..26648adb 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt @@ -135,10 +135,7 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readByteArray(): ByteArray { checkOpen() - if (!realizeOffset() || atEnd()) return EMPTY_BYTES - val bytes = readUpTo(remaining) - remaining -= bytes.size - return bytes + return drainRemaining() } @Throws(IOException::class) @@ -161,10 +158,7 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readUtf8(): String { checkOpen() - if (!realizeOffset() || atEnd()) return "" - val bytes = readUpTo(remaining) - remaining -= bytes.size - return String(bytes, Charsets.UTF_8) + return String(drainRemaining(), Charsets.UTF_8) } @Throws(IOException::class) @@ -217,10 +211,20 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readString(charset: Charset): String { checkOpen() - if (!realizeOffset() || atEnd()) return "" + return String(drainRemaining(), charset) + } + + /** + * Drains everything still inside the slice window as raw bytes, advancing [remaining]. + * Returns [EMPTY_BYTES] when the offset cannot be realized or the slice is already at its + * end; that empty array decodes to an empty string for the two text reads above. + */ + @Throws(IOException::class) + private fun drainRemaining(): ByteArray { + if (!realizeOffset() || atEnd()) return EMPTY_BYTES val bytes = readUpTo(remaining) remaining -= bytes.size - return String(bytes, charset) + return bytes } @Throws(IOException::class)