From 9ef242ff170fbeaa716ecd588618e75a4ac976c8 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Wed, 17 Jun 2026 05:50:06 +0300 Subject: [PATCH 1/3] feat: add lifecycle-bound SSE stream and typed per-endpoint adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce SseStream: an AutoCloseable Iterable that owns the underlying HTTP response. Closing the stream — explicitly, via use {} / try-with-resources, or implicitly when iteration runs to completion — closes the response and releases its connection, so a partial consume never strands the body. This mirrors the close-on-partial-consume invariant PagedIterable enforces. The previously doc-only "do not iterate twice" warning is now an enforced single-pass guard, and iteration after close is rejected. Add a reusable per-endpoint adapter, TypedSseStream, that maps raw events to typed models via a caller-supplied SseEventMapper. The mapper receives the event name and joined data and returns a decoded value, Skip, or a Done sentinel; it is the seam where the Serde SPI is invoked and where per-API done-sentinel and error-envelope conventions live. Mapping is applied lazily, one element at a time, so a partial consume decodes only the events taken. Closing the typed adapter propagates to the underlying stream. Both surfaces are hand-written runtime primitives usable today; a code generator can target them later without embedding any per-API convention in core. Closes #35 Closes #62 --- sdk-core/api/sdk-core.api | 52 +++++ .../http/sse/ServerSentEventExtensions.kt | 34 +++ .../sdk/core/http/sse/SseEventMapper.kt | 92 ++++++++ .../dexpace/sdk/core/http/sse/SseStream.kt | 150 +++++++++++++ .../sdk/core/http/sse/TypedSseStream.kt | 94 ++++++++ .../sdk/core/http/sse/SseStreamTest.kt | 212 ++++++++++++++++++ .../sdk/core/http/sse/TypedSseStreamTest.kt | 183 +++++++++++++++ 7 files changed, 817 insertions(+) create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt create mode 100644 sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7ebcc312..7a456e4f 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1560,6 +1560,58 @@ public final class org/dexpace/sdk/core/http/sse/ServerSentEventReader { public final class org/dexpace/sdk/core/http/sse/ServerSentEvents { public static final fun readServerSentEvents (Lorg/dexpace/sdk/core/io/BufferedSource;)Lkotlin/sequences/Sequence; public static final fun readServerSentEventsAsIterable (Lorg/dexpace/sdk/core/io/BufferedSource;)Ljava/lang/Iterable; + public static final fun sseStream (Lorg/dexpace/sdk/core/http/response/Response;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public static final fun typed (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)Lorg/dexpace/sdk/core/http/sse/TypedSseStream; +} + +public abstract interface class org/dexpace/sdk/core/http/sse/SseEventMapper { + public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Companion; + public static fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public abstract fun map (Ljava/lang/String;Ljava/lang/String;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public static fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public static fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Companion { + public final fun done ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public final fun skip ()Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; + public final fun value (Ljava/lang/Object;)Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result; +} + +public abstract class org/dexpace/sdk/core/http/sse/SseEventMapper$Result { +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Done; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public static final field INSTANCE Lorg/dexpace/sdk/core/http/sse/SseEventMapper$Result$Skip; +} + +public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Value : org/dexpace/sdk/core/http/sse/SseEventMapper$Result { + public fun (Ljava/lang/Object;)V + public final fun getModel ()Ljava/lang/Object; +} + +public final class org/dexpace/sdk/core/http/sse/SseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker { + public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseStream$Companion; + public synthetic fun (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun close ()V + public static final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public static final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public fun iterator ()Ljava/util/Iterator; +} + +public final class org/dexpace/sdk/core/http/sse/SseStream$Companion { + public final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; + public final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; +} + +public final class org/dexpace/sdk/core/http/sse/TypedSseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker { + public fun (Lorg/dexpace/sdk/core/http/sse/SseStream;Lorg/dexpace/sdk/core/http/sse/SseEventMapper;)V + public fun close ()V + public fun iterator ()Ljava/util/Iterator; } public final class org/dexpace/sdk/core/instrumentation/ClientLogger { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt index 317948ae..fad370e8 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/ServerSentEventExtensions.kt @@ -9,6 +9,7 @@ package org.dexpace.sdk.core.http.sse +import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.io.BufferedSource /** @@ -53,3 +54,36 @@ public fun BufferedSource.readServerSentEvents(): Sequence { */ public fun BufferedSource.readServerSentEventsAsIterable(): Iterable = readServerSentEvents().asIterable() + +/** + * Opens an [SseStream] over this response's body, binding the stream's lifecycle to the + * response: closing the returned stream (explicitly, via `use {}` / try-with-resources, or + * implicitly when iteration completes) closes the response and releases its connection. + * + * Use this on a streaming response to consume events without stranding the body on a partial + * consume: + * + * ``` + * response.sseStream().use { stream -> + * for (event in stream) { /* handle event */ } + * } + * ``` + * + * @throws IllegalStateException if the response has no body. + */ +public fun Response.sseStream(): SseStream { + val responseBody = checkNotNull(body) { "Response has no body to stream as Server-Sent Events" } + return SseStream.from(responseBody.source(), this) +} + +/** + * Wraps this raw [SseStream] in a [TypedSseStream] that maps each event to a model `T` via + * [mapper], decoding lazily on consume. The returned adapter inherits this stream's lifecycle: + * closing it closes this stream (and the response). + * + * The [mapper] is the per-endpoint seam — it is where the [org.dexpace.sdk.core.serde.Serde] + * SPI is called and where done-sentinel / error-envelope conventions live. + * + * @param mapper Maps a raw event to a [SseEventMapper.Result]. + */ +public fun SseStream.typed(mapper: SseEventMapper): TypedSseStream = TypedSseStream(this, mapper) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt new file mode 100644 index 00000000..63ae3853 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseEventMapper.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +/** + * Maps a raw [ServerSentEvent] to a typed model, or signals that the event should be skipped + * or that the stream is finished. + * + * This is the per-endpoint seam between core's format-agnostic SSE plumbing and an API's + * own conventions. A mapper receives the event's `event:` name (or `null` if the server sent + * no `event:` field) and the joined `data:` payload, and returns: + * + * - a decoded value `T` — yielded to the caller; + * - [Skip] — the event carries no model (a keep-alive comment, a bare `id:` cursor, an + * ignored event type); iteration silently moves on; + * - [Done] — a sentinel event marking end of stream (e.g. OpenAI's `data: [DONE]`); iteration + * terminates cleanly and the underlying [SseStream] closes. + * + * The mapper is the place to call the [org.dexpace.sdk.core.serde.Serde] SPI — for example + * `serde.deserializer.deserialize(data, MyDto::class.java)` — and the place to translate an + * error-envelope event into a thrown exception. Core deliberately holds **no** sentinel or + * error conventions; those are per-API and live entirely in the caller-supplied mapper. + * + * Mapping is applied **lazily, one element at a time**, as the typed iterator is pulled, so a + * partially consumed stream only decodes the events actually taken. + * + * Kotlin callers may pass a lambda; Java callers may use a lambda or implement this interface. + * + * @param T The model type events decode into. + */ +public fun interface SseEventMapper { + /** + * Maps one event to a [Result]. + * + * @param eventName The event's `event:` field, or `null` if the server omitted it. + * @param data The event's `data:` lines joined with `\n` (empty string if the event had + * no `data:` field). Joining matches the conventional WHATWG client-side reconstruction. + * @return [value] to yield a decoded model, [Skip] to drop the event, or [Done] to end + * the stream. + * @throws RuntimeException the mapper may throw (e.g. on a decode failure or a mapped + * error-envelope); the exception propagates to the consumer's pull. + */ + public fun map( + eventName: String?, + data: String, + ): Result + + /** + * The outcome of mapping a single SSE event: a decoded [value], [Skip], or [Done]. + * + * @param T The model type. + */ + public sealed class Result { + /** + * A successfully decoded model to yield to the consumer. + * + * @property model The decoded value. + */ + public class Value(public val model: T) : Result() + + /** + * The event carries no model and should be silently skipped (keep-alives, bare + * cursors, ignored event types). Iteration continues with the next event. + */ + public object Skip : Result() + + /** + * The event is a done-sentinel: iteration terminates cleanly and the backing + * [SseStream] closes. No model is yielded for the sentinel event itself. + */ + public object Done : Result() + } + + public companion object { + /** Wraps [model] in a [Result.Value]. */ + @JvmStatic + public fun value(model: T): Result = Result.Value(model) + + /** The shared [Result.Skip] singleton, exposed for Java callers. */ + @JvmStatic + public fun skip(): Result = Result.Skip + + /** The shared [Result.Done] singleton, exposed for Java callers. */ + @JvmStatic + public fun done(): Result = Result.Done + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt new file mode 100644 index 00000000..1cab3f73 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.io.BufferedSource +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * An [AutoCloseable] [Iterable] of [ServerSentEvent] whose lifecycle is bound to the + * underlying HTTP response. + * + * `SseStream` closes the [resource] it was opened over — typically the + * [org.dexpace.sdk.core.http.response.Response] (or its body) — whenever the stream is + * [closed][close], whether that close is explicit, via a `use {}` / try-with-resources block, + * or implicit when iteration runs to completion. This mirrors the close-on-partial-consume + * invariant [org.dexpace.sdk.core.http.paging.PagedIterable] enforces: a consumer that pulls + * only the first few events and walks away never strands the response body or its pooled + * connection. + * + * Unlike the bare [Sequence] returned by [BufferedSource.readServerSentEvents], an `SseStream` + * **owns** the response. The [ServerSentEventReader] it drives is single-pass and stateful, so + * the stream is single-pass too: + * + * - [iterator] may be called **once**. A second call throws [IllegalStateException]; this + * surfaces the previously documented "do not iterate twice" warning as an enforced guard. + * - Once [close] has run, [iterator] (and any further pulls from an in-flight iterator) throw + * [IllegalStateException] — the is-closed guard. + * - When the backing reader reports end-of-stream the iterator terminates cleanly **and** + * closes the stream, releasing the response without a separate [close] call. + * + * `close()` is idempotent and propagates to [resource] exactly once. The underlying response + * `close()` is itself expected to be idempotent, so a redundant [close] here is harmless. + * + * **Threading**: not thread-safe for iteration — drive a single iterator from one thread, as + * with the backing reader. [close] is safe to call from another thread (e.g. to cancel a + * long-lived stream); it takes a lock and flips an atomic guard, so a concurrent [close] + * races cleanly with iteration and the iterating thread observes the closed state on its next + * pull. + * + * @property reader The WHATWG parser driving the byte stream. Owned by this stream. + * @property resource The response (or body) whose lifecycle this stream governs; closed once + * when the stream closes. + */ +public class SseStream private constructor( + private val reader: ServerSentEventReader, + private val resource: Closeable, +) : AutoCloseable, Iterable { + private val closed = AtomicBoolean(false) + private val iteratorTaken = AtomicBoolean(false) + private val closeLock = ReentrantLock() + + /** + * Returns the single-pass iterator over the stream's events. + * + * Each pull advances the backing [ServerSentEventReader]. When the reader signals + * end-of-stream the iterator terminates and the stream is [closed][close] eagerly, so a + * fully consumed stream needs no explicit `close()`. + * + * @throws IllegalStateException if the stream is already closed, or if [iterator] was + * already called on this instance (the stream is single-pass). + */ + override fun iterator(): Iterator { + check(!closed.get()) { "SseStream is closed" } + check(iteratorTaken.compareAndSet(false, true)) { + "SseStream is single-pass; iterator() may only be called once" + } + return SseIterator() + } + + /** + * Closes the stream and the underlying [resource]. Idempotent: only the first call + * propagates to [resource]; later calls are no-ops. Safe to call concurrently with + * iteration. + */ + override fun close() { + if (closed.compareAndSet(false, true)) { + closeLock.withLock { resource.close() } + } + } + + private inner class SseIterator : AbstractIterator() { + override fun computeNext() { + // An out-of-band close() (e.g. cancellation from another thread) ends iteration + // cleanly rather than reading from a resource that is being torn down. + if (closed.get()) { + done() + return + } + // Reader exceptions (mid-stream connection drops) propagate to the caller; the + // stream stays open so the caller can still close()/use{} the response on the way + // out. AbstractIterator transitions to FAILED, matching PagedIterable's contract. + val event = reader.next() + if (event == null) { + // Clean end-of-stream: release the response without a separate close() call. + done() + close() + return + } + setNext(event) + } + } + + public companion object { + /** + * Opens an [SseStream] that parses [source] and, when closed, closes [resource]. + * + * Typical use binds [resource] to the originating + * [org.dexpace.sdk.core.http.response.Response] so closing the stream releases the + * response body and its connection: + * + * ``` + * SseStream.from(response.body!!.source(), response).use { stream -> + * for (event in stream) { /* handle event */ } + * } + * ``` + * + * [source] and [resource] may be the same object when the source itself owns the + * transport handle. + * + * @param source The byte stream to parse as Server-Sent Events. + * @param resource The handle to close when the stream closes (response, body, etc.). + */ + @JvmStatic + public fun from( + source: BufferedSource, + resource: Closeable, + ): SseStream = SseStream(ServerSentEventReader(source), resource) + + /** + * Opens an [SseStream] over a pre-built [reader], closing [resource] when the stream + * closes. Use when the caller already holds a configured [ServerSentEventReader]. + * + * @param reader The parser to drive. Owned by the returned stream. + * @param resource The handle to close when the stream closes. + */ + @JvmStatic + public fun fromReader( + reader: ServerSentEventReader, + resource: Closeable, + ): SseStream = SseStream(reader, resource) + } +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt new file mode 100644 index 00000000..19976754 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +/** + * A reusable, per-endpoint adapter that turns an [SseStream] of raw [ServerSentEvent] into an + * [AutoCloseable] [Iterable] of typed models `T`, applying a caller-supplied [SseEventMapper] + * lazily as elements are pulled. + * + * This is the runtime primitive a code generator would target for a streaming endpoint: given + * the AutoCloseable SSE stream (#35) and a mapper that knows the endpoint's `event:` names, + * done-sentinel, and error-envelope conventions, it yields decoded models on demand. It is + * **fully usable by hand today** — construct one with any lambda mapper that calls the + * [org.dexpace.sdk.core.serde.Serde] SPI. + * + * Lifecycle is inherited from the wrapped [SseStream]: + * + * - [close] propagates to the underlying stream, which closes the response/body. Idempotent. + * - Running iteration to completion (mapper end-of-stream or a [SseEventMapper.Result.Done] + * sentinel) closes the stream automatically, so a fully consumed adapter needs no explicit + * `close()`. + * - Single-pass: [iterator] may be called once (the backing [SseStream] enforces this). + * + * **Lazy per-element decode.** The mapper — and therefore any [org.dexpace.sdk.core.serde.Serde] + * deserialize call inside it — runs only when the consumer pulls the next element. A partial + * consume (`first()`, `take(n)`, `stream().findFirst()`) decodes only the events actually + * taken, then `close()` releases the rest of the response. + * + * **Skip / Done.** Events the mapper returns [SseEventMapper.Result.Skip] for are dropped and + * iteration advances to the next event. A [SseEventMapper.Result.Done] ends iteration cleanly + * and closes the stream. + * + * **Errors.** A mapper that throws (decode failure, mapped error-envelope) propagates the + * exception to the consumer's pull; the underlying [SseStream] stays open so the caller's + * `use {}` / try-with-resources still closes the response on the way out. + * + * **Threading**: not thread-safe for iteration — drive a single iterator from one thread. + * [close] is safe from another thread (delegates to [SseStream.close]). + * + * @param T The model type events decode into. + * @property events The underlying raw SSE stream. Owned by this adapter; closed when this + * adapter closes. + * @property mapper The per-endpoint mapping from raw event to typed [Result]. + */ +public class TypedSseStream( + private val events: SseStream, + private val mapper: SseEventMapper, +) : AutoCloseable, Iterable { + /** + * Returns the single-pass iterator over decoded models. Delegates iteration to the wrapped + * [SseStream] and applies [mapper] lazily per element. + * + * @throws IllegalStateException if the underlying [SseStream] is already closed or its + * iterator was already taken. + */ + override fun iterator(): Iterator = MappingIterator(events.iterator()) + + /** + * Closes the adapter and the underlying [SseStream] (and thereby the response). Idempotent. + */ + override fun close() { + events.close() + } + + private inner class MappingIterator( + private val raw: Iterator, + ) : AbstractIterator() { + override fun computeNext() { + // Pull raw events until the mapper yields a value, signals Done, or the stream ends. + // Skips don't surface to the caller; only taken elements are decoded (lazy decode). + while (raw.hasNext()) { + val event = raw.next() + when (val result = mapper.map(event.event, event.data.joinToString("\n"))) { + is SseEventMapper.Result.Value -> { + setNext(result.model) + return + } + SseEventMapper.Result.Skip -> continue + SseEventMapper.Result.Done -> { + done() + close() + return + } + } + } + done() + } + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt new file mode 100644 index 00000000..8e765e7c --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseBody +import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.io.OkioIoProvider +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class SseStreamTest { + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun source(text: String): BufferedSource = Io.provider.source(text.toByteArray(Charsets.UTF_8)) + + /** Records how many times close() ran, standing in for a Response/body. */ + private class CountingCloseable : Closeable { + val closeCount = AtomicInteger(0) + + override fun close() { + closeCount.incrementAndGet() + } + + val isClosed: Boolean get() = closeCount.get() > 0 + } + + @Test + fun `full iteration yields all events and auto-closes the resource`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\ndata: b\n\n"), resource) + + val events = stream.toList() + + assertEquals(2, events.size) + assertEquals(listOf("a"), events[0].data) + assertEquals(listOf("b"), events[1].data) + // Clean end-of-stream closes the resource exactly once without an explicit close(). + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `explicit close propagates to the resource`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\n"), resource) + + assertFalse(resource.isClosed) + stream.close() + assertTrue(resource.isClosed) + } + + @Test + fun `use block closes the resource even on partial consume`() { + val resource = CountingCloseable() + val first = + SseStream.from(source("data: a\n\ndata: b\n\ndata: c\n\n"), resource).use { stream -> + // Pull only the first event, then leave the use block — the rest are abandoned. + stream.iterator().next() + } + + assertEquals(listOf("a"), first.data) + // Partial consume must not strand the resource: use{} closed it. + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `close is idempotent`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\n"), resource) + + stream.close() + stream.close() + stream.close() + + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `iterator may only be taken once`() { + val stream = SseStream.from(source("data: a\n\n"), CountingCloseable()) + + stream.iterator() + assertFailsWith { stream.iterator() } + } + + @Test + fun `iterator on a closed stream throws`() { + val stream = SseStream.from(source("data: a\n\n"), CountingCloseable()) + stream.close() + + assertFailsWith { stream.iterator() } + } + + @Test + fun `out-of-band close ends an in-flight iteration cleanly`() { + val resource = CountingCloseable() + val stream = SseStream.from(source("data: a\n\ndata: b\n\ndata: c\n\n"), resource) + val iter = stream.iterator() + + // Consume the first event, then close mid-iteration (e.g. cancellation). + assertTrue(iter.hasNext()) + assertEquals(listOf("a"), iter.next().data) + stream.close() + + // The next pull observes the closed state and terminates without error. + assertFalse(iter.hasNext()) + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `reader exception propagates and leaves the resource closeable`() { + val resource = CountingCloseable() + val goodPart = "data: good\n\n".toByteArray(Charsets.UTF_8) + val backing = Io.provider.buffer().also { it.write(goodPart) } + val failingSource = + object : BufferedSource by backing { + override fun exhausted(): Boolean = false + + override fun readByte(): Byte { + if (backing.size == 0L) throw IOException("simulated connection drop") + return backing.readByte() + } + + override fun peek(): BufferedSource = backing.peek() + } + + val stream = SseStream.from(failingSource, resource) + val iter = stream.iterator() + + assertEquals(listOf("good"), iter.next().data) + // The mid-stream drop surfaces on the next pull. + assertFailsWith { iter.hasNext() } + // The stream stayed open after the failure, so the caller can still release it. + stream.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `Response sseStream binds the stream to the response body lifecycle`() { + val closed = AtomicInteger(0) + val backing = source("data: a\n\ndata: b\n\n") + val body = + object : ResponseBody() { + override fun mediaType() = null + + override fun contentLength(): Long = -1L + + override fun source(): BufferedSource = backing + + override fun close() { + closed.incrementAndGet() + backing.close() + } + } + val response = + Response.builder() + .request(Request.builder().method(Method.GET).url("https://example.test/stream").build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(body) + .build() + + val events = response.sseStream().use { it.toList() } + + assertEquals(2, events.size) + // Closing the SSE stream closed the response body. + assertEquals(1, closed.get()) + } + + @Test + fun `Response sseStream throws when the response has no body`() { + val response = + Response.builder() + .request(Request.builder().method(Method.GET).url("https://example.test/stream").build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .build() + + assertFailsWith { response.sseStream() } + } + + @Test + fun `fromReader binds an existing reader to a resource`() { + val resource = CountingCloseable() + val reader = ServerSentEventReader(source("data: x\n\n")) + val stream = SseStream.fromReader(reader, resource) + + val events = stream.toList() + assertEquals(listOf("x"), events.single().data) + assertEquals(1, resource.closeCount.get()) + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt new file mode 100644 index 00000000..691f3ddf --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.sse + +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.core.serde.Deserializer +import org.dexpace.sdk.io.OkioIoProvider +import java.io.Closeable +import java.io.InputStream +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class TypedSseStreamTest { + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun source(text: String): BufferedSource = Io.provider.source(text.toByteArray(Charsets.UTF_8)) + + /** Builds a single `data:`-only SSE event block carrying [payload]. */ + private fun event(payload: String): String = "data: $payload\n\n" + + /** Toy JSON for a [Chunk] with [text]. */ + private fun chunkJson(text: String): String = """{"text":"$text"}""" + + private fun stream( + text: String, + resource: Closeable = CountingCloseable(), + ): SseStream = SseStream.from(source(text), resource) + + private class CountingCloseable : Closeable { + val closeCount = AtomicInteger(0) + + override fun close() { + closeCount.incrementAndGet() + } + } + + private data class Chunk(val text: String) + + /** + * Minimal [Deserializer] standing in for a real adapter (e.g. sdk-serde-jackson): it + * records every decode call and parses the toy `{"text":"..."}` shape into [Chunk], + * letting the test assert both typed output and the Serde SPI being exercised lazily. + */ + private class RecordingDeserializer : Deserializer { + val decoded = mutableListOf() + + @Suppress("UNCHECKED_CAST") + override fun deserialize( + input: String, + type: Class, + ): T { + decoded.add(input) + val text = Regex("\"text\"\\s*:\\s*\"([^\"]*)\"").find(input)?.groupValues?.get(1) ?: "" + return Chunk(text) as T + } + + override fun deserialize( + input: ByteArray, + type: Class, + ): T = deserialize(String(input, Charsets.UTF_8), type) + + override fun deserialize( + inputStream: InputStream, + type: Class, + ): T = deserialize(inputStream.readBytes().toString(Charsets.UTF_8), type) + } + + /** Endpoint mapper: `[DONE]` ends the stream, comment-only events skip, else decode. */ + private fun chunkMapper(deserializer: Deserializer): SseEventMapper = + SseEventMapper { _, data -> + when { + data == "[DONE]" -> SseEventMapper.Result.Done + data.isEmpty() -> SseEventMapper.Result.Skip + else -> SseEventMapper.Result.Value(deserializer.deserialize(data, Chunk::class.java)) + } + } + + @Test + fun `maps raw events to typed models via the deserializer`() { + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("hi")) + event(chunkJson("bye"))) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + assertEquals(listOf(Chunk("hi"), Chunk("bye")), chunks) + assertEquals(2, deser.decoded.size) + } + + @Test + fun `done sentinel ends iteration and closes the underlying stream`() { + val resource = CountingCloseable() + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("a")) + event("[DONE]") + event(chunkJson("never")), resource) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + // Only the pre-sentinel element is yielded; the post-sentinel event is never decoded. + assertEquals(listOf(Chunk("a")), chunks) + assertEquals(1, deser.decoded.size) + // Reaching the done-sentinel closes the underlying stream/resource. + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `skip results are dropped and do not surface to the caller`() { + val deser = RecordingDeserializer() + // The middle event is a comment-only keep-alive: its data is empty, so the mapper skips it. + val raw = stream(event(chunkJson("a")) + ":keep-alive\n\n" + event(chunkJson("b"))) + + val chunks = TypedSseStream(raw, chunkMapper(deser)).toList() + + assertEquals(listOf(Chunk("a"), Chunk("b")), chunks) + } + + @Test + fun `decode happens lazily per element on consume`() { + val deser = RecordingDeserializer() + val raw = stream(event(chunkJson("a")) + event(chunkJson("b"))) + val iter = TypedSseStream(raw, chunkMapper(deser)).iterator() + + // Nothing decoded before the first pull. + assertEquals(0, deser.decoded.size) + assertEquals(Chunk("a"), iter.next()) + // Only the first element has been decoded; the second is untouched until pulled. + assertEquals(1, deser.decoded.size) + assertEquals(Chunk("b"), iter.next()) + assertEquals(2, deser.decoded.size) + } + + @Test + fun `close propagates from the typed adapter to the underlying resource`() { + val resource = CountingCloseable() + val raw = stream(event(chunkJson("a")), resource) + val typed = TypedSseStream(raw, chunkMapper(RecordingDeserializer())) + + typed.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `mapper exceptions propagate and leave the stream closeable`() { + val resource = CountingCloseable() + val raw = stream(event("boom"), resource) + val typed = + TypedSseStream(raw) { _, _ -> + error("decode failed") + } + + assertFailsWith { typed.iterator().next() } + // The underlying stream stayed open; the caller can still release it. + typed.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `result factory helpers build the expected variants`() { + val value = SseEventMapper.value(Chunk("v")) + assertTrue(value is SseEventMapper.Result.Value) + assertEquals(Chunk("v"), value.model) + assertTrue(SseEventMapper.skip() === SseEventMapper.Result.Skip) + assertTrue(SseEventMapper.done() === SseEventMapper.Result.Done) + } + + @Test + fun `typed extension wraps a stream`() { + val deser = RecordingDeserializer() + val chunks = stream(event(chunkJson("z"))).typed(chunkMapper(deser)).toList() + assertTrue(chunks.contains(Chunk("z"))) + } +} From a3b9b8bf3c33e1b4454815444b8546dcb2a98a53 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 13:52:37 +0300 Subject: [PATCH 2/3] fix: release the SSE response on the stream error paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SseStream and TypedSseStream only released the underlying response when iteration reached a clean end-of-stream. A mid-stream reader failure (a dropped connection) or a throwing event mapper propagated the error without closing, so any consumer that iterated without `use {}` — a bare for-loop or `toList()` — stranded the response body and its pooled connection. The success path auto-closed, which made bare iteration look safe right up until an error hit. Release the response on those error paths too. Close is idempotent, so a surrounding `use {}` still works; a failure to release is attached to the original error as a suppressed throwable instead of masking it. Make automatic end-of-stream cleanup tolerant of a close failure as well: the events were already delivered, so a failing resource close on the final pull (clean EOS, or a TypedSseStream done-sentinel) must not turn a fully-read stream into a thrown result and discard the collected events. An explicit close() still propagates a release failure — that is the caller asking to release, so they own it. Drop the redundant ReentrantLock around the close: the AtomicBoolean compare-and-set already guarantees the resource is closed exactly once, so only the CAS winner ever entered the lock. Clarify the threading docs — cancelling a stream blocked inside an in-flight read surfaces as an IOException to the iterating thread, not a clean end. --- .../dexpace/sdk/core/http/sse/SseStream.kt | 76 ++++++++--- .../sdk/core/http/sse/TypedSseStream.kt | 40 +++++- .../sdk/core/http/sse/SseStreamTest.kt | 125 +++++++++++++++--- .../sdk/core/http/sse/TypedSseStreamTest.kt | 23 +++- 4 files changed, 221 insertions(+), 43 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt index 1cab3f73..7f2f9d08 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt @@ -10,8 +10,6 @@ package org.dexpace.sdk.core.http.sse import org.dexpace.sdk.core.io.BufferedSource import java.io.Closeable import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock /** * An [AutoCloseable] [Iterable] of [ServerSentEvent] whose lifecycle is bound to the @@ -20,10 +18,10 @@ import kotlin.concurrent.withLock * `SseStream` closes the [resource] it was opened over — typically the * [org.dexpace.sdk.core.http.response.Response] (or its body) — whenever the stream is * [closed][close], whether that close is explicit, via a `use {}` / try-with-resources block, - * or implicit when iteration runs to completion. This mirrors the close-on-partial-consume - * invariant [org.dexpace.sdk.core.http.paging.PagedIterable] enforces: a consumer that pulls - * only the first few events and walks away never strands the response body or its pooled - * connection. + * implicit when iteration runs to completion, or implicit when the backing reader fails + * mid-stream. This mirrors the close-on-partial-consume invariant + * [org.dexpace.sdk.core.http.paging.PagedIterable] enforces: a consumer that pulls only the + * first few events and walks away never strands the response body or its pooled connection. * * Unlike the bare [Sequence] returned by [BufferedSource.readServerSentEvents], an `SseStream` * **owns** the response. The [ServerSentEventReader] it drives is single-pass and stateful, so @@ -35,15 +33,24 @@ import kotlin.concurrent.withLock * [IllegalStateException] — the is-closed guard. * - When the backing reader reports end-of-stream the iterator terminates cleanly **and** * closes the stream, releasing the response without a separate [close] call. + * - When the backing reader **fails** mid-stream the error propagates to the caller, but the + * response is released first, so even a consumer iterating without `use {}` does not strand + * the connection. A failure to release is attached to the reader error as a suppressed + * throwable. * * `close()` is idempotent and propagates to [resource] exactly once. The underlying response - * `close()` is itself expected to be idempotent, so a redundant [close] here is harmless. + * `close()` is itself expected to be idempotent, so a redundant [close] here is harmless. A + * release failure during **automatic** cleanup (clean end-of-stream) is dropped — the events + * were already delivered, so it must not turn a successful read into a thrown result; a release + * failure during an **explicit** [close] is propagated to the caller. * * **Threading**: not thread-safe for iteration — drive a single iterator from one thread, as * with the backing reader. [close] is safe to call from another thread (e.g. to cancel a - * long-lived stream); it takes a lock and flips an atomic guard, so a concurrent [close] - * races cleanly with iteration and the iterating thread observes the closed state on its next - * pull. + * long-lived stream); it flips an atomic guard, so a concurrent [close] races cleanly with + * iteration. A thread parked **between** pulls observes the closed state and ends cleanly on + * its next pull; a thread blocked **inside** an in-flight read when [close] tears the resource + * down typically sees that read fail, so the cancellation surfaces to the iterating thread as + * an [java.io.IOException] rather than a clean end. Either way the resource is released once. * * @property reader The WHATWG parser driving the byte stream. Owned by this stream. * @property resource The response (or body) whose lifecycle this stream governs; closed once @@ -55,7 +62,6 @@ public class SseStream private constructor( ) : AutoCloseable, Iterable { private val closed = AtomicBoolean(false) private val iteratorTaken = AtomicBoolean(false) - private val closeLock = ReentrantLock() /** * Returns the single-pass iterator over the stream's events. @@ -78,30 +84,58 @@ public class SseStream private constructor( /** * Closes the stream and the underlying [resource]. Idempotent: only the first call * propagates to [resource]; later calls are no-ops. Safe to call concurrently with - * iteration. + * iteration. A failure to release [resource] is propagated to the caller — an explicit + * close is the caller asking to release, so they own the failure. */ override fun close() { if (closed.compareAndSet(false, true)) { - closeLock.withLock { resource.close() } + resource.close() + } + } + + /** + * Releases [resource] exactly once on an iterator-driven terminal path (clean end-of-stream + * or a mid-stream reader failure), where the caller is not the one invoking [close] and a + * failure to release must not become the iteration's outcome. A close failure is attached to + * [primary] as a suppressed throwable when a reader error is in flight, and otherwise dropped + * (the events were already delivered). Mirrors the close-on-failure helper in + * [org.dexpace.sdk.core.pipeline.ResponsePipeline]. + */ + private fun releaseQuietly(primary: Throwable?) { + if (!closed.compareAndSet(false, true)) return + try { + resource.close() + } catch (closeError: Throwable) { + primary?.addSuppressed(closeError) } } private inner class SseIterator : AbstractIterator() { override fun computeNext() { - // An out-of-band close() (e.g. cancellation from another thread) ends iteration - // cleanly rather than reading from a resource that is being torn down. + // An out-of-band close() (e.g. cancellation from another thread, between pulls) ends + // iteration cleanly rather than reading from a resource that is being torn down. if (closed.get()) { done() return } - // Reader exceptions (mid-stream connection drops) propagate to the caller; the - // stream stays open so the caller can still close()/use{} the response on the way - // out. AbstractIterator transitions to FAILED, matching PagedIterable's contract. - val event = reader.next() + val event = + try { + reader.next() + } catch (readerError: Throwable) { + // A mid-stream reader failure (e.g. a dropped connection) releases the response + // before propagating, so a consumer iterating without use{} never strands the + // connection; any release failure is attached as suppressed. AbstractIterator + // then transitions to FAILED. (A close() racing an in-flight read also lands + // here, surfacing the cancellation as that read's IOException.) + releaseQuietly(readerError) + throw readerError + } if (event == null) { - // Clean end-of-stream: release the response without a separate close() call. + // Clean end-of-stream: release the response. The events were already delivered, so + // a release failure must not turn a successful read into a thrown result — it is + // dropped here (an explicit close()/use{} still surfaces a release failure). done() - close() + releaseQuietly(primary = null) return } setNext(event) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt index 19976754..80c96836 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt @@ -36,8 +36,9 @@ package org.dexpace.sdk.core.http.sse * and closes the stream. * * **Errors.** A mapper that throws (decode failure, mapped error-envelope) propagates the - * exception to the consumer's pull; the underlying [SseStream] stays open so the caller's - * `use {}` / try-with-resources still closes the response on the way out. + * exception to the consumer's pull, but the underlying [SseStream] is released first, so even a + * consumer iterating without `use {}` does not strand the connection. A failure to release is + * attached to the mapper error as a suppressed throwable. * * **Threading**: not thread-safe for iteration — drive a single iterator from one thread. * [close] is safe from another thread (delegates to [SseStream.close]). @@ -62,28 +63,59 @@ public class TypedSseStream( /** * Closes the adapter and the underlying [SseStream] (and thereby the response). Idempotent. + * A failure to release is propagated, mirroring [SseStream.close]. */ override fun close() { events.close() } + /** + * Releases the underlying [SseStream] on an iterator-driven terminal path (a + * [SseEventMapper.Result.Done] sentinel or a mapper failure), where a failure to release must + * not become the iteration's outcome. A close failure is attached to [primary] as a suppressed + * throwable when a mapper error is in flight, and otherwise dropped (the values were already + * delivered). Mirrors [SseStream]'s own end-of-stream cleanup. + */ + private fun releaseQuietly(primary: Throwable?) { + try { + events.close() + } catch (closeError: Throwable) { + primary?.addSuppressed(closeError) + } + } + private inner class MappingIterator( private val raw: Iterator, ) : AbstractIterator() { override fun computeNext() { // Pull raw events until the mapper yields a value, signals Done, or the stream ends. // Skips don't surface to the caller; only taken elements are decoded (lazy decode). + // (A reader failure surfaces from raw and has already released the stream; see + // SseStream.) Only the mapper call needs its own release-on-failure here. while (raw.hasNext()) { val event = raw.next() - when (val result = mapper.map(event.event, event.data.joinToString("\n"))) { + val result = + try { + mapper.map(event.event, event.data.joinToString("\n")) + } catch (mapperError: Throwable) { + // A mapper failure (decode error, mapped error-envelope) releases the + // underlying stream before propagating, so a consumer iterating without + // use{} never strands the connection; a release failure is suppressed. + releaseQuietly(mapperError) + throw mapperError + } + when (result) { is SseEventMapper.Result.Value -> { setNext(result.model) return } SseEventMapper.Result.Skip -> continue SseEventMapper.Result.Done -> { + // Clean end at the done-sentinel: the values before it were already + // delivered, so a release failure here is dropped rather than discarding + // them (an explicit close()/use{} still surfaces a release failure). done() - close() + releaseQuietly(primary = null) return } } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt index 8e765e7c..c4526e2d 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt @@ -18,7 +18,11 @@ import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.io.OkioIoProvider import java.io.Closeable import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals @@ -34,6 +38,24 @@ class SseStreamTest { private fun source(text: String): BufferedSource = Io.provider.source(text.toByteArray(Charsets.UTF_8)) + /** + * A source that replays [text] and then throws on the next read, standing in for a transport + * whose connection drops mid-stream. + */ + private fun failingSourceAfter(text: String): BufferedSource { + val backing = Io.provider.buffer().also { it.write(text.toByteArray(Charsets.UTF_8)) } + return object : BufferedSource by backing { + override fun exhausted(): Boolean = false + + override fun readByte(): Byte { + if (backing.size == 0L) throw IOException("simulated connection drop") + return backing.readByte() + } + + override fun peek(): BufferedSource = backing.peek() + } + } + /** Records how many times close() ran, standing in for a Response/body. */ private class CountingCloseable : Closeable { val closeCount = AtomicInteger(0) @@ -128,31 +150,102 @@ class SseStreamTest { } @Test - fun `reader exception propagates and leaves the resource closeable`() { + fun `mid-stream reader failure propagates and releases the resource`() { + val resource = CountingCloseable() + val stream = SseStream.from(failingSourceAfter("data: good\n\n"), resource) + val iter = stream.iterator() + + assertEquals(listOf("good"), iter.next().data) + // The mid-stream drop surfaces on the next pull... + assertFailsWith { iter.hasNext() } + // ...and releases the response on the way out, so a consumer iterating without use{} + // never strands the connection. + assertEquals(1, resource.closeCount.get()) + // close() remains a safe, idempotent no-op after the automatic release. + stream.close() + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `toList propagates a mid-stream failure but still releases the resource`() { val resource = CountingCloseable() - val goodPart = "data: good\n\n".toByteArray(Charsets.UTF_8) - val backing = Io.provider.buffer().also { it.write(goodPart) } - val failingSource = - object : BufferedSource by backing { + val stream = SseStream.from(failingSourceAfter("data: good\n\n"), resource) + + // A bare terminal operation (no use{}) still releases the response on a mid-stream drop. + assertFailsWith { stream.toList() } + assertEquals(1, resource.closeCount.get()) + } + + @Test + fun `a resource close failure on clean end-of-stream does not discard already-read events`() { + val stream = + SseStream.from(source("data: a\n\ndata: b\n\n"), Closeable { throw IOException("release failed") }) + + // Every event was read successfully; a failure to release the connection on the terminal + // pull must not turn that into a thrown result and lose the collected events. + val events = stream.toList() + + assertEquals(2, events.size) + assertEquals(listOf("a"), events[0].data) + assertEquals(listOf("b"), events[1].data) + } + + @Test + fun `explicit close surfaces a resource close failure`() { + val stream = SseStream.from(source("data: a\n\n"), Closeable { throw IOException("release failed") }) + + // Unlike automatic end-of-stream cleanup, an explicit close() is the caller asking to + // release, so a release failure is propagated to them. + assertFailsWith { stream.close() } + } + + @Test + fun `close during a blocked read cancels the iteration via an IOException`() { + val readStarted = CountDownLatch(1) + val releaseLatch = CountDownLatch(1) + val released = AtomicBoolean(false) + val resource = + Closeable { + released.set(true) + releaseLatch.countDown() + } + // A source that blocks inside readByte() until the resource is closed, then fails the way a + // real transport does when its socket is closed out from under an in-flight read. + val blockingSource = + object : BufferedSource by Io.provider.buffer() { override fun exhausted(): Boolean = false + // Empty peek => the reader's BOM probe finds no BOM and does not block here. + override fun peek(): BufferedSource = Io.provider.buffer() + override fun readByte(): Byte { - if (backing.size == 0L) throw IOException("simulated connection drop") - return backing.readByte() + readStarted.countDown() + releaseLatch.await(2, TimeUnit.SECONDS) + throw IOException("source closed during read") } - - override fun peek(): BufferedSource = backing.peek() } - - val stream = SseStream.from(failingSource, resource) + val stream = SseStream.from(blockingSource, resource) val iter = stream.iterator() + val failure = AtomicReference() + val readerThread = + Thread { + try { + iter.hasNext() + } catch (t: Throwable) { + failure.set(t) + } + } + readerThread.start() - assertEquals(listOf("good"), iter.next().data) - // The mid-stream drop surfaces on the next pull. - assertFailsWith { iter.hasNext() } - // The stream stayed open after the failure, so the caller can still release it. + // Wait until the iterating thread is parked inside the blocking read, then cancel. + assertTrue(readStarted.await(2, TimeUnit.SECONDS)) stream.close() - assertEquals(1, resource.closeCount.get()) + readerThread.join(TimeUnit.SECONDS.toMillis(2)) + + // Cancelling an in-flight read surfaces as an IOException to the iterating thread (not a + // clean end), and the resource is released exactly once. + assertTrue(failure.get() is IOException, "expected IOException, got ${failure.get()}") + assertTrue(released.get()) } @Test diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt index 691f3ddf..8bf78c6b 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStreamTest.kt @@ -12,6 +12,7 @@ import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.core.serde.Deserializer import org.dexpace.sdk.io.OkioIoProvider import java.io.Closeable +import java.io.IOException import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger import kotlin.test.BeforeTest @@ -151,7 +152,7 @@ class TypedSseStreamTest { } @Test - fun `mapper exceptions propagate and leave the stream closeable`() { + fun `mapper exceptions propagate and release the underlying resource`() { val resource = CountingCloseable() val raw = stream(event("boom"), resource) val typed = @@ -160,11 +161,29 @@ class TypedSseStreamTest { } assertFailsWith { typed.iterator().next() } - // The underlying stream stayed open; the caller can still release it. + // The mapper failure releases the underlying stream, so a consumer iterating without use{} + // never strands the connection. + assertEquals(1, resource.closeCount.get()) + // close() remains a safe, idempotent no-op after the automatic release. typed.close() assertEquals(1, resource.closeCount.get()) } + @Test + fun `a resource close failure on the done sentinel does not discard already-mapped values`() { + val raw = + SseStream.from( + source(event(chunkJson("a")) + event("[DONE]")), + Closeable { throw IOException("release failed") }, + ) + + // The values before the done-sentinel were delivered; a failure to release on the sentinel + // must not turn that into a thrown result and lose them. + val chunks = TypedSseStream(raw, chunkMapper(RecordingDeserializer())).toList() + + assertEquals(listOf(Chunk("a")), chunks) + } + @Test fun `result factory helpers build the expected variants`() { val value = SseEventMapper.value(Chunk("v")) From b88bc06e52df6993f4f97e45c83030baad8c23f8 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 14:06:25 +0300 Subject: [PATCH 3/3] fix: log SSE end-of-stream close failures instead of dropping them Automatic end-of-stream cleanup swallows a failure to release the response, because the events have already been delivered and letting the failure propagate would discard a fully-read stream. Swallowing it silently hides a real I/O problem, so emit it through ClientLogger at WARN (event `sse.close.failed`) with the cause attached. An explicit close() still propagates the failure, and the suppressed-onto-the-primary path for an in-flight reader/mapper error is unchanged. Centralize the quiet release in SseStream.releaseQuietly so the TypedSseStream done-sentinel and mapper-error paths share that one site instead of duplicating the close handling. The regenerated API snapshot reflects the logger now threaded through SseStream's private constructor; the public factory surface is unchanged. --- sdk-core/api/sdk-core.api | 2 +- .../dexpace/sdk/core/http/sse/SseStream.kt | 51 ++++++++++++++----- .../sdk/core/http/sse/TypedSseStream.kt | 27 +++------- .../sdk/core/http/sse/SseStreamTest.kt | 23 +++++++++ 4 files changed, 69 insertions(+), 34 deletions(-) diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7a456e4f..b0f9ee4e 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1596,7 +1596,7 @@ public final class org/dexpace/sdk/core/http/sse/SseEventMapper$Result$Value : o public final class org/dexpace/sdk/core/http/sse/SseStream : java/lang/AutoCloseable, java/lang/Iterable, kotlin/jvm/internal/markers/KMappedMarker { public static final field Companion Lorg/dexpace/sdk/core/http/sse/SseStream$Companion; - public synthetic fun (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;Lkotlin/jvm/internal/DefaultConstructorMarker;)V + public synthetic fun (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;Lorg/dexpace/sdk/core/instrumentation/ClientLogger;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close ()V public static final fun from (Lorg/dexpace/sdk/core/io/BufferedSource;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; public static final fun fromReader (Lorg/dexpace/sdk/core/http/sse/ServerSentEventReader;Ljava/io/Closeable;)Lorg/dexpace/sdk/core/http/sse/SseStream; diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt index 7f2f9d08..71810429 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/SseStream.kt @@ -7,6 +7,7 @@ package org.dexpace.sdk.core.http.sse +import org.dexpace.sdk.core.instrumentation.ClientLogger import org.dexpace.sdk.core.io.BufferedSource import java.io.Closeable import java.util.concurrent.atomic.AtomicBoolean @@ -40,9 +41,10 @@ import java.util.concurrent.atomic.AtomicBoolean * * `close()` is idempotent and propagates to [resource] exactly once. The underlying response * `close()` is itself expected to be idempotent, so a redundant [close] here is harmless. A - * release failure during **automatic** cleanup (clean end-of-stream) is dropped — the events - * were already delivered, so it must not turn a successful read into a thrown result; a release - * failure during an **explicit** [close] is propagated to the caller. + * release failure during **automatic** cleanup (clean end-of-stream) is logged at `WARN` and + * otherwise swallowed — the events were already delivered, so it must not turn a successful read + * into a thrown result; a release failure during an **explicit** [close] is propagated to the + * caller. * * **Threading**: not thread-safe for iteration — drive a single iterator from one thread, as * with the backing reader. [close] is safe to call from another thread (e.g. to cancel a @@ -55,10 +57,13 @@ import java.util.concurrent.atomic.AtomicBoolean * @property reader The WHATWG parser driving the byte stream. Owned by this stream. * @property resource The response (or body) whose lifecycle this stream governs; closed once * when the stream closes. + * @property logger Structured logger used to report a close failure during automatic + * end-of-stream cleanup (where the failure cannot otherwise surface to the caller). */ public class SseStream private constructor( private val reader: ServerSentEventReader, private val resource: Closeable, + private val logger: ClientLogger, ) : AutoCloseable, Iterable { private val closed = AtomicBoolean(false) private val iteratorTaken = AtomicBoolean(false) @@ -94,19 +99,28 @@ public class SseStream private constructor( } /** - * Releases [resource] exactly once on an iterator-driven terminal path (clean end-of-stream - * or a mid-stream reader failure), where the caller is not the one invoking [close] and a - * failure to release must not become the iteration's outcome. A close failure is attached to - * [primary] as a suppressed throwable when a reader error is in flight, and otherwise dropped - * (the events were already delivered). Mirrors the close-on-failure helper in + * Releases [resource] exactly once on an iterator-driven terminal path (clean end-of-stream, + * a [SseEventMapper.Result.Done] sentinel from a wrapping [TypedSseStream], or a mid-stream + * reader/mapper failure), where the caller is not the one invoking [close] and a failure to + * release must not become the iteration's outcome. A close failure is attached to [primary] as + * a suppressed throwable when an error is in flight; when there is no [primary] to carry it + * (a clean terminal path) the failure is logged at `WARN` and swallowed, since the events were + * already delivered. Mirrors the close-on-failure helper in * [org.dexpace.sdk.core.pipeline.ResponsePipeline]. */ - private fun releaseQuietly(primary: Throwable?) { + internal fun releaseQuietly(primary: Throwable?) { if (!closed.compareAndSet(false, true)) return try { resource.close() } catch (closeError: Throwable) { - primary?.addSuppressed(closeError) + if (primary != null) { + primary.addSuppressed(closeError) + } else { + logger.atWarning() + .event("sse.close.failed") + .cause(closeError) + .log("Failed to release the SSE response during end-of-stream cleanup") + } } } @@ -133,7 +147,7 @@ public class SseStream private constructor( if (event == null) { // Clean end-of-stream: release the response. The events were already delivered, so // a release failure must not turn a successful read into a thrown result — it is - // dropped here (an explicit close()/use{} still surfaces a release failure). + // logged at WARN and swallowed here (an explicit close()/use{} still surfaces one). done() releaseQuietly(primary = null) return @@ -166,7 +180,7 @@ public class SseStream private constructor( public fun from( source: BufferedSource, resource: Closeable, - ): SseStream = SseStream(ServerSentEventReader(source), resource) + ): SseStream = SseStream(ServerSentEventReader(source), resource, ClientLogger(SseStream::class)) /** * Opens an [SseStream] over a pre-built [reader], closing [resource] when the stream @@ -179,6 +193,17 @@ public class SseStream private constructor( public fun fromReader( reader: ServerSentEventReader, resource: Closeable, - ): SseStream = SseStream(reader, resource) + ): SseStream = SseStream(reader, resource, ClientLogger(SseStream::class)) + + /** + * Test-only seam: opens a stream over [reader]/[resource] with an injected [logger], so a + * test can assert on the `WARN` emitted when end-of-stream cleanup fails to release. + */ + @JvmSynthetic + internal fun fromReader( + reader: ServerSentEventReader, + resource: Closeable, + logger: ClientLogger, + ): SseStream = SseStream(reader, resource, logger) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt index 80c96836..762aebb2 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/sse/TypedSseStream.kt @@ -69,21 +69,6 @@ public class TypedSseStream( events.close() } - /** - * Releases the underlying [SseStream] on an iterator-driven terminal path (a - * [SseEventMapper.Result.Done] sentinel or a mapper failure), where a failure to release must - * not become the iteration's outcome. A close failure is attached to [primary] as a suppressed - * throwable when a mapper error is in flight, and otherwise dropped (the values were already - * delivered). Mirrors [SseStream]'s own end-of-stream cleanup. - */ - private fun releaseQuietly(primary: Throwable?) { - try { - events.close() - } catch (closeError: Throwable) { - primary?.addSuppressed(closeError) - } - } - private inner class MappingIterator( private val raw: Iterator, ) : AbstractIterator() { @@ -100,8 +85,9 @@ public class TypedSseStream( } catch (mapperError: Throwable) { // A mapper failure (decode error, mapped error-envelope) releases the // underlying stream before propagating, so a consumer iterating without - // use{} never strands the connection; a release failure is suppressed. - releaseQuietly(mapperError) + // use{} never strands the connection; a release failure is suppressed onto + // the mapper error. Delegated so close-failure handling lives in one place. + events.releaseQuietly(mapperError) throw mapperError } when (result) { @@ -112,10 +98,11 @@ public class TypedSseStream( SseEventMapper.Result.Skip -> continue SseEventMapper.Result.Done -> { // Clean end at the done-sentinel: the values before it were already - // delivered, so a release failure here is dropped rather than discarding - // them (an explicit close()/use{} still surfaces a release failure). + // delivered, so a release failure here is logged at WARN and swallowed by + // SseStream rather than discarding them (an explicit close()/use{} still + // surfaces a release failure). done() - releaseQuietly(primary = null) + events.releaseQuietly(primary = null) return } } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt index c4526e2d..32f3d8a7 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/sse/SseStreamTest.kt @@ -13,9 +13,12 @@ import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.http.response.ResponseBody import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.instrumentation.FakeSlf4jLogger import org.dexpace.sdk.core.io.BufferedSource import org.dexpace.sdk.core.io.Io import org.dexpace.sdk.io.OkioIoProvider +import org.slf4j.event.Level import java.io.Closeable import java.io.IOException import java.util.concurrent.CountDownLatch @@ -199,6 +202,26 @@ class SseStreamTest { assertFailsWith { stream.close() } } + @Test + fun `a close failure during end-of-stream cleanup is logged at warning`() { + val fakeSlf4j = FakeSlf4jLogger("test.sse") + val stream = + SseStream.fromReader( + ServerSentEventReader(source("data: a\n\n")), + Closeable { throw IOException("release failed") }, + ClientLogger.forTesting(fakeSlf4j), + ) + + val events = stream.toList() + + // Iteration still completes with the delivered events... + assertEquals(listOf("a"), events.single().data) + // ...and the otherwise-dropped close failure is surfaced as a WARN rather than vanishing. + val warning = fakeSlf4j.records.single { it.level == Level.WARN } + assertEquals("sse.close.failed", warning.keyValues.single { it.key == "event" }.value) + assertTrue(warning.cause is IOException) + } + @Test fun `close during a blocked read cancels the iteration via an IOException`() { val readStarted = CountDownLatch(1)