Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.dexpace.sdk.core.instrumentation.MdcSnapshot
import org.dexpace.sdk.core.io.BufferedSource
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.util.concurrent.CompletableFuture

private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor")

Expand All @@ -43,27 +44,7 @@ private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor")
* To extend MDC propagation through user-supplied downstream operators, enable
* `Hooks.enableAutomaticContextPropagation()` at the application level.
*/
public fun AsyncHttpClient.executeMono(request: Request): Mono<Response> =
Mono.defer {
val mdc = MdcSnapshot.capture()
Mono.fromFuture { mdc.withMdc { executeAsync(request) } }
.doOnSubscribe {
mdc.withMdc {
log.atVerbose()
.event("async.adapter.subscribed")
.field("adapter.type", "reactor")
.log()
}
}
.doOnCancel {
mdc.withMdc {
log.atVerbose()
.event("async.adapter.cancel_propagated")
.field("adapter.type", "reactor")
.log()
}
}
}
public fun AsyncHttpClient.executeMono(request: Request): Mono<Response> = deferMono { executeAsync(request) }

/**
* Pipeline-level [Mono] facade — see [executeMono].
Expand All @@ -79,27 +60,33 @@ public fun AsyncHttpClient.executeMono(request: Request): Mono<Response> =
* To extend MDC propagation through user-supplied downstream operators, enable
* `Hooks.enableAutomaticContextPropagation()` at the application level.
*/
public fun AsyncHttpPipeline.sendMono(request: Request): Mono<Response> =
public fun AsyncHttpPipeline.sendMono(request: Request): Mono<Response> = deferMono { sendAsync(request) }

/**
* Bridges a [CompletableFuture]-returning [supplier] to a cold [Mono]. Each subscription runs
* the supplier afresh under [Mono.defer], captures the subscriber's MDC, and reinstates it for
* the future-producing call and for both lifecycle log hooks. Cancelling the subscription
* cancels the underlying future through [Mono.fromFuture].
*/
private fun deferMono(supplier: () -> CompletableFuture<Response>): Mono<Response> =
Mono.defer {
val mdc = MdcSnapshot.capture()
Mono.fromFuture { mdc.withMdc { sendAsync(request) } }
.doOnSubscribe {
mdc.withMdc {
log.atVerbose()
.event("async.adapter.subscribed")
.field("adapter.type", "reactor")
.log()
}
}
.doOnCancel {
mdc.withMdc {
log.atVerbose()
.event("async.adapter.cancel_propagated")
.field("adapter.type", "reactor")
.log()
}
}
Mono.fromFuture { mdc.withMdc { supplier() } }
.doOnSubscribe { logEvent(mdc, "async.adapter.subscribed") }
.doOnCancel { logEvent(mdc, "async.adapter.cancel_propagated") }
}

private fun logEvent(
mdc: MdcSnapshot,
event: String,
) {
mdc.withMdc {
log.atVerbose()
.event(event)
.field("adapter.type", "reactor")
.log()
}
}

/**
* Exposes the SSE event stream as a Reactor [Flux]. Backpressure is honored via
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,13 @@ import org.dexpace.sdk.core.io.Sink
* wrapper field is read/written without synchronization.
*/
internal class ForeignSinkAdapter(private val delegate: Sink) : okio.Sink {
private var cachedBuffer: okio.Buffer? = null
private var cachedWrapper: OkioBuffer? = null
private val wrappers = OkioBufferWrapperCache()

override fun write(
source: okio.Buffer,
byteCount: Long,
) {
// Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio
// passes us. Okio reuses the same source for a buffered producer's lifetime, so this
// amortizes wrapper allocation to once per producer.
val wrapper =
cachedWrapper.takeIf { source === cachedBuffer }
?: OkioBuffer(source).also {
cachedBuffer = source
cachedWrapper = it
}
delegate.write(wrapper, byteCount)
delegate.write(wrappers.wrap(source), byteCount)
}

override fun flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,12 @@ import org.dexpace.sdk.core.io.Source
* wrapper field is read/written without synchronization.
*/
internal class ForeignSourceAdapter(private val delegate: Source) : okio.Source {
private var cachedBuffer: okio.Buffer? = null
private var cachedWrapper: OkioBuffer? = null
private val wrappers = OkioBufferWrapperCache()

override fun read(
sink: okio.Buffer,
byteCount: Long,
): Long {
// Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio
// passes us. Okio reuses the same sink for a buffered consumer's lifetime, so this
// amortizes wrapper allocation to once per consumer.
val wrapper =
cachedWrapper.takeIf { sink === cachedBuffer }
?: OkioBuffer(sink).also {
cachedBuffer = sink
cachedWrapper = it
}
return delegate.read(wrapper, byteCount)
}
): Long = delegate.read(wrappers.wrap(sink), byteCount)

override fun timeout(): okio.Timeout = okio.Timeout.NONE

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2026 dexpace and Omar Aljarrah
*
* Licensed under the MIT License. See LICENSE in the project root.
* SPDX-License-Identifier: MIT
*/

package org.dexpace.sdk.io.internal

/**
* Caches a single [OkioBuffer] wrapper keyed by the reference identity of the [okio.Buffer] it
* adapts. Okio hands a buffered consumer (or producer) the same buffer instance for its whole
* lifetime, so wrapping it once and reusing the wrapper amortizes allocation to once per
* consumer instead of once per chunk.
*
* ## Thread-safety
*
* Not safe for concurrent use — the cached fields are read/written without synchronization,
* matching the single-threaded contract of the Okio buffered source/sink that owns the cache.
*/
internal class OkioBufferWrapperCache {
private var cachedBuffer: okio.Buffer? = null
private var cachedWrapper: OkioBuffer? = null

fun wrap(buffer: okio.Buffer): OkioBuffer =
cachedWrapper.takeIf { buffer === cachedBuffer }
?: OkioBuffer(buffer).also {
cachedBuffer = buffer
cachedWrapper = it
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ internal class SlicedOkioBufferedSource(
@Throws(IOException::class)
override fun readByteArray(): ByteArray {
checkOpen()
if (!realizeOffset() || atEnd()) return EMPTY_BYTES
val bytes = readUpTo(remaining)
remaining -= bytes.size
return bytes
return drainRemaining()
}

@Throws(IOException::class)
Expand All @@ -161,10 +158,7 @@ internal class SlicedOkioBufferedSource(
@Throws(IOException::class)
override fun readUtf8(): String {
checkOpen()
if (!realizeOffset() || atEnd()) return ""
val bytes = readUpTo(remaining)
remaining -= bytes.size
return String(bytes, Charsets.UTF_8)
return String(drainRemaining(), Charsets.UTF_8)
}

@Throws(IOException::class)
Expand Down Expand Up @@ -217,10 +211,20 @@ internal class SlicedOkioBufferedSource(
@Throws(IOException::class)
override fun readString(charset: Charset): String {
checkOpen()
if (!realizeOffset() || atEnd()) return ""
return String(drainRemaining(), charset)
}

/**
* Drains everything still inside the slice window as raw bytes, advancing [remaining].
* Returns [EMPTY_BYTES] when the offset cannot be realized or the slice is already at its
* end; that empty array decodes to an empty string for the two text reads above.
*/
@Throws(IOException::class)
private fun drainRemaining(): ByteArray {
if (!realizeOffset() || atEnd()) return EMPTY_BYTES
val bytes = readUpTo(remaining)
remaining -= bytes.size
return String(bytes, charset)
return bytes
}

@Throws(IOException::class)
Expand Down
Loading