diff --git a/README.md b/README.md index 2b55bd8e..536306d7 100644 --- a/README.md +++ b/README.md @@ -247,7 +247,7 @@ See [docs/pipelines.md](docs/pipelines.md) for the step-author walkthrough. |---|---| | `client` | `HttpClient`, `AsyncHttpClient` — the two transport SPIs (sync and async). | | `http.request` | `Request`, `RequestBody`, `FileRequestBody`, `LoggableRequestBody`, `Method`. | -| `http.response` | `Response`, `ResponseBody`, `LoggableResponseBody`, `Status` (a value-carrying class with a total `fromCode`), `HttpResponseException`. | +| `http.response` | `Response`, `ResponseBody`, `LoggableResponseBody`, `Status` (a value-carrying class with a total `fromCode`), `HttpResponseException`, plus the raw-vs-parsed seam: `ResponseHandler` (with dep-free `string()`/`empty()` handlers) and a lazy, parse-once `ParsedResponse`. | | `http.response.exception` | Typed `HttpException` hierarchy (`BadRequestException`, `RequestTimeoutException`, `TooManyRequestsException`, `ServiceUnavailableException`, …) with `retryable` derived from `RetryUtils.isRetryable`, plus `NetworkException` and `HttpExceptionFactory`. | | `http.common` | `Headers`, `HttpHeaderName` (interned), `MediaType`, `Protocol`, `HttpRange`, `ETag`, `RequestConditions`. | | `http.context` | `CallContext` → `DispatchContext` → `RequestContext` → `ExchangeContext` chain, `ContextStore`. | @@ -258,7 +258,7 @@ See [docs/pipelines.md](docs/pipelines.md) for the step-author walkthrough. | `http.paging` | `PagedIterable`, `PagedResponse`, `PagingOptions` with `byPage()` and `stream()` accessors. | | `pagination` | `Paginator` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page` / `SimplePage`. | | `pipeline` | Recovery-aware primitives: `RequestPipeline`, `ResponsePipeline`, `ExecutionPipeline` over a sealed `ResponseOutcome`, with steps (`pipeline.step`, `pipeline.step.retry`) like `RetryStep`, `ResponseRecoveryStep`, `IdempotencyKeyStep`, `ClientIdentityStep`. | -| `serde` | `Serde`, `Serializer`, `Deserializer` abstractions and `Tristate` (absent / null / present). | +| `serde` | `Serde`, `Serializer`, `Deserializer` abstractions, `Tristate` (absent / null / present), and `SerdeException` (the unchecked failure adapters translate codec errors into). | | `io` | `Source`, `Sink`, `Buffer`, `BufferedSource`, `BufferedSink`, `IoProvider`, `Io`, `TeeSink`. | | `instrumentation` | `ClientLogger` (zero-alloc disabled path), `LoggingEvent`, `UrlRedactor`, `Tracer` / `NoopTracer`, `Span` / `NoopSpan`, `InstrumentationContext`. | | `instrumentation.metrics` | `Meter`, `LongCounter`, `DoubleHistogram`, `NoopMeter`. | diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index cc094608..93270501 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1153,6 +1153,27 @@ public final class org/dexpace/sdk/core/http/response/LoggableResponseBody : org public fun source ()Lorg/dexpace/sdk/core/io/BufferedSource; } +public final class org/dexpace/sdk/core/http/response/ParsedResponse : java/io/Closeable { + public static final field Companion Lorg/dexpace/sdk/core/http/response/ParsedResponse$Companion; + public fun close ()V + public final fun getHeaders ()Lorg/dexpace/sdk/core/http/common/Headers; + public final fun getMessage ()Ljava/lang/String; + public final fun getProtocol ()Lorg/dexpace/sdk/core/http/common/Protocol; + public final fun getRaw ()Lorg/dexpace/sdk/core/http/response/Response; + public final fun getRequest ()Lorg/dexpace/sdk/core/http/request/Request; + public final fun getStatus ()Lorg/dexpace/sdk/core/http/response/Status; + public static final fun of (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/response/ResponseHandler;)Lorg/dexpace/sdk/core/http/response/ParsedResponse; + public final fun value ()Ljava/lang/Object; +} + +public final class org/dexpace/sdk/core/http/response/ParsedResponse$Companion { + public final fun of (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/response/ResponseHandler;)Lorg/dexpace/sdk/core/http/response/ParsedResponse; +} + +public final class org/dexpace/sdk/core/http/response/ParsedResponseKt { + public static final fun parsedWith (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/response/ResponseHandler;)Lorg/dexpace/sdk/core/http/response/ParsedResponse; +} + public final class org/dexpace/sdk/core/http/response/Response : java/io/Closeable { public static final field Companion Lorg/dexpace/sdk/core/http/response/Response$Companion; public synthetic fun (Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/http/common/Protocol;Lorg/dexpace/sdk/core/http/response/Status;Ljava/lang/String;Lorg/dexpace/sdk/core/http/common/Headers;Lorg/dexpace/sdk/core/http/response/ResponseBody;Lkotlin/jvm/internal/DefaultConstructorMarker;)V @@ -1218,6 +1239,18 @@ public final class org/dexpace/sdk/core/http/response/ResponseBody$Companion { public static synthetic fun create$default (Lorg/dexpace/sdk/core/http/response/ResponseBody$Companion;Lorg/dexpace/sdk/core/io/BufferedSource;Lorg/dexpace/sdk/core/http/common/MediaType;JILjava/lang/Object;)Lorg/dexpace/sdk/core/http/response/ResponseBody; } +public abstract interface class org/dexpace/sdk/core/http/response/ResponseHandler { + public static final field Companion Lorg/dexpace/sdk/core/http/response/ResponseHandler$Companion; + public static fun empty ()Lorg/dexpace/sdk/core/http/response/ResponseHandler; + public abstract fun handle (Lorg/dexpace/sdk/core/http/response/Response;)Ljava/lang/Object; + public static fun string ()Lorg/dexpace/sdk/core/http/response/ResponseHandler; +} + +public final class org/dexpace/sdk/core/http/response/ResponseHandler$Companion { + public final fun empty ()Lorg/dexpace/sdk/core/http/response/ResponseHandler; + public final fun string ()Lorg/dexpace/sdk/core/http/response/ResponseHandler; +} + public final class org/dexpace/sdk/core/http/response/Status { public static final field ACCEPTED Lorg/dexpace/sdk/core/http/response/Status; public static final field ALREADY_REPORTED Lorg/dexpace/sdk/core/http/response/Status; diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ParsedResponse.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ParsedResponse.kt new file mode 100644 index 00000000..b2097ec9 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ParsedResponse.kt @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.response + +import org.dexpace.sdk.core.http.common.Headers +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.request.Request +import java.io.Closeable +import java.io.IOException +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +/** + * Pairs a raw [Response] with a [ResponseHandler] so the typed value can be parsed **lazily and + * exactly once**, while the raw status / headers / metadata stay readable without forcing + * deserialization. + * + * This is the raw-vs-parsed seam: header and status access (e.g. reading `ETag`, branching on a + * `404`) goes straight to the underlying response and never touches the body, whereas the typed + * value is produced on demand by the handler. Because the body is single-use, [value] memoizes + * the handler's outcome — the first call runs the handler and every subsequent call returns the + * same result (or re-throws the same failure) without re-invoking the handler or re-reading the + * body. + * + * ## Body consumption + * + * The handler owns the body. Calling [value] runs the handler, which typically consumes and + * closes the body (the built-in [ResponseHandler.string] / [ResponseHandler.empty] and adapter + * JSON handlers do). **Read any raw headers / status before the first [value] call**, since the + * body is gone afterwards. [close] is available for the path where the typed value is never + * needed and the body must still be released. + * + * ## Thread-safety + * + * Raw accessors are immutable and safe to share. [value] is guarded by a [ReentrantLock] + * (`synchronized` would pin a carrier thread under virtual threads): concurrent first calls block + * until the single parse completes and then all observe the same memoized result. A `null` result + * and a thrown failure are both memoized, so neither triggers a re-parse. + * + * @param T The typed value the handler produces. + * @param raw The underlying raw response. Header / status / metadata access reads from here. + * @param handler Strategy that maps [raw] to the typed value on first [value] access. + */ +public class ParsedResponse internal constructor( + public val raw: Response, + private val handler: ResponseHandler, +) : Closeable { + private val lock = ReentrantLock() + + // Holds the memoized outcome once the handler has run. A non-null holder means "parsed" + // (success or failure); the wrapped value distinguishes the two. A holder (rather than a + // bare value) lets a legitimately-null success memoize without being mistaken for "unparsed". + @Volatile + private var outcome: Outcome? = null + + /** The request that produced [raw]. Does not parse. */ + public val request: Request get() = raw.request + + /** The negotiated wire protocol. Does not parse. */ + public val protocol: Protocol get() = raw.protocol + + /** The HTTP status. Does not parse. */ + public val status: Status get() = raw.status + + /** The status-line reason phrase, or `null` if absent. Does not parse. */ + public val message: String? get() = raw.message + + /** The response headers. Does not parse. */ + public val headers: Headers get() = raw.headers + + /** + * Returns the typed value, parsing it on the first call and memoizing the outcome. + * + * The handler runs at most once: the first call invokes [ResponseHandler.handle] (which + * typically consumes and closes the body); subsequent calls return the same value, or + * re-throw the same failure, without re-running the handler. + * + * Any failure the handler throws is memoized and re-thrown verbatim on every later call — not + * just [IOException]. Handlers commonly throw **unchecked** exceptions (the Jackson `jsonHandler` + * throws `SerdeException`), so callers should not assume the only escape is [IOException]. + * + * @return The parsed value (which may be `null` if the handler is typed `ResponseHandler` + * and produces `null`). + * @throws IOException If the handler failed with an [IOException] — cached and re-thrown. The + * `@Throws` declaration covers only the checked surface for Java callers; the handler may also + * propagate **unchecked** exceptions (e.g. `SerdeException` from the Jackson `jsonHandler`), + * which are memoized and re-thrown the same way. + */ + @Throws(IOException::class) + public fun value(): T { + outcome?.let { return it.get() } + return lock.withLock { + outcome?.let { return it.get() } + // Memoize the handler's outcome — success or failure — so neither re-runs the handler + // nor re-reads the (now consumed) body on a subsequent call. + val resolved: Outcome = + try { + Outcome.Success(handler.handle(raw)) + } catch (t: Throwable) { + // Catch Throwable, not Exception, on purpose: once the handler has touched the + // single-use body, re-running it would read an already-consumed stream. Even an + // Error (e.g. an OOM mid-parse) is memoized so a later call re-throws it rather + // than re-reading the body and masking the original failure. + Outcome.Failure(t) + } + outcome = resolved + resolved.get() + } + } + + /** + * Releases the raw response body. Idempotent (forwards to [Response.close], which is itself + * idempotent). Safe to call whether or not [value] has run. + * + * @throws IOException If the underlying close fails. + */ + @Throws(IOException::class) + override fun close() { + raw.close() + } + + private sealed class Outcome { + abstract fun get(): T + + class Success(private val value: T) : Outcome() { + override fun get(): T = value + } + + class Failure(private val error: Throwable) : Outcome() { + override fun get(): Nothing = throw error + } + } + + public companion object { + /** + * Creates a [ParsedResponse] that parses [response] with [handler] on first access. + * Java-friendly factory mirroring the Kotlin [Response.parsedWith] extension. + * + * @param response The raw response. + * @param handler Strategy that maps the response to the typed value. + * @return A lazily-parsing [ParsedResponse]. + */ + @JvmStatic + public fun of( + response: Response, + handler: ResponseHandler, + ): ParsedResponse = ParsedResponse(response, handler) + } +} + +/** + * Wraps this response in a [ParsedResponse] bound to [handler], so the typed value parses lazily + * and exactly once while raw status / headers stay accessible. Kotlin-ergonomic mirror of + * [ParsedResponse.of]. + * + * @param handler Strategy that maps this response to the typed value. + * @return A lazily-parsing [ParsedResponse]. + */ +public fun Response.parsedWith(handler: ResponseHandler): ParsedResponse = ParsedResponse(this, handler) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ResponseHandler.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ResponseHandler.kt new file mode 100644 index 00000000..2e897ec0 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/ResponseHandler.kt @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.response + +import org.dexpace.sdk.core.io.Io +import java.io.IOException +import java.nio.charset.StandardCharsets + +/** + * Maps a raw [Response] to a typed result of type [T]. + * + * A `ResponseHandler` is the seam generated service code dispatches against: the transport + * produces a raw [Response], and the handler decides how to turn it into a domain value — decode + * a JSON body into a DTO, read a body as text, or simply discard the body for an empty result. + * + * ## Body ownership + * + * A handler that reads the body **owns consuming and closing it**. [handle] is expected to leave + * the response closed when it returns (whether it read the body or not), so callers do not need a + * surrounding `use {}` block once they have delegated to a handler. The built-in [string] and + * [empty] handlers honor this. Because the body is single-use, a handler must read it at most + * once; pair a handler with [ParsedResponse] when the typed value must be exposed lazily and + * memoized so the body is consumed exactly once. + * + * ## Raw access first + * + * Reading the body is destructive, so any header / status inspection that must happen alongside + * parsing should read from the raw [Response] (or [ParsedResponse]'s raw accessors) **before** + * invoking the handler. + * + * ## Thread-safety + * + * Handlers are typically stateless and shared across requests; the built-in factories return + * stateless instances. A stateful handler must guard its own state. + * + * ## Nullability + * + * A handler that may legitimately produce `null` (e.g. an absent-but-valid payload) should be typed + * `ResponseHandler` so the nullability is visible to Kotlin and Java callers alike; otherwise a + * `null` slips through a non-null `T` as a platform value. [ParsedResponse.value] memoizes a `null` + * result correctly either way. + * + * @param T The typed result this handler produces. + */ +public fun interface ResponseHandler { + /** + * Consumes [response] and produces the typed result. Implementations that read the body must + * also close [response] before returning. + * + * @param response The raw response to map. + * @return The typed result. + * @throws IOException If reading the body fails. + */ + @Throws(IOException::class) + public fun handle(response: Response): T + + public companion object { + /** + * A handler that reads the entire response body as a UTF-8 [String] and closes the + * response. A bodyless response (e.g. `204 No Content`) yields an empty string. + * + * **Unbounded.** This reads the whole body into a single in-memory [String] with no size + * cap, so it is an unbounded-allocation vector against a hostile or misbehaving server. + * Unlike the bounded body-logging path elsewhere in the SDK, it applies no limit — use it + * only for trusted endpoints with bounded payloads, not for untrusted or large bodies. + * + * @return A stateless [String] handler. + */ + @JvmStatic + public fun string(): ResponseHandler = + ResponseHandler { response -> + response.use { + val body = it.body ?: return@use "" + body.source().readString(StandardCharsets.UTF_8) + } + } + + /** + * A handler that fully drains and closes the body, discarding its bytes, and returns + * [Unit]. Use for endpoints whose payload is irrelevant (e.g. a `DELETE` returning a + * status only) but whose connection must still be released. + * + * @return A stateless discarding handler. + */ + @JvmStatic + public fun empty(): ResponseHandler = + ResponseHandler { response -> + response.use { + val body = it.body ?: return@use + val source = body.source() + // Pump into a throwaway scratch buffer (cleared each round) so the connection + // is released without materializing the whole body in memory. The buffer is + // closed deterministically so its segments are recycled even if the drain + // throws mid-stream, rather than leaning on the GC. + Io.provider.buffer().use { scratch -> + while (source.read(scratch, DRAIN_CHUNK_BYTES) != -1L) { + scratch.clear() + } + } + } + } + + /** + * Per-read pump size for the discarding drain — a reasonable chunk size. `read` treats it + * as an upper bound, so the exact value is not load-bearing for correctness. + */ + private const val DRAIN_CHUNK_BYTES: Long = 8 * 1024 + } +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/response/ResponseHandlerTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/response/ResponseHandlerTest.kt new file mode 100644 index 00000000..7ac206f2 --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/response/ResponseHandlerTest.kt @@ -0,0 +1,267 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.response + +import org.dexpace.sdk.core.http.common.MediaType +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.io.OkioIoProvider +import java.io.IOException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNull +import kotlin.test.assertSame +import kotlin.test.assertTrue + +class ResponseHandlerTest { + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun request(): Request = + Request.builder() + .url("https://api.example.test/p") + .method(Method.GET) + .build() + + private fun response( + payload: String = "abc", + status: Status = Status.OK, + mediaType: MediaType? = null, + body: ResponseBody? = stringBody(payload, mediaType), + ): Response = + Response.builder() + .request(request()) + .protocol(Protocol.HTTP_1_1) + .status(status) + .message("OK") + .addHeader("X-Trace", "t-1") + .let { if (body != null) it.body(body) else it } + .build() + + private fun stringBody( + payload: String, + mediaType: MediaType? = null, + ): ResponseBody { + val source = Io.provider.source(payload.toByteArray()) + return ResponseBody.create(source, mediaType, payload.length.toLong()) + } + + // A body that records how many times it was closed and exposes a single-use source. + private class CountingBody(payload: String) : ResponseBody() { + val closeCount = AtomicInteger(0) + private val source = Io.provider.source(payload.toByteArray()) + + override fun mediaType(): MediaType? = null + + override fun contentLength(): Long = -1L + + override fun source(): BufferedSource = source + + override fun close() { + closeCount.incrementAndGet() + source.close() + } + } + + // ---- ResponseHandler.string() ---- + + @Test + fun `string handler reads the body as UTF-8`() { + val handled = ResponseHandler.string().handle(response("hello world")) + assertEquals("hello world", handled) + } + + @Test + fun `string handler closes the response after reading`() { + val body = CountingBody("payload") + val resp = response(body = body) + ResponseHandler.string().handle(resp) + assertEquals(1, body.closeCount.get()) + } + + @Test + fun `string handler returns empty string for a bodyless response`() { + val resp = response(body = null) + assertEquals("", ResponseHandler.string().handle(resp)) + } + + // ---- ResponseHandler.empty() ---- + + @Test + fun `empty handler drains and closes the body returning Unit`() { + val body = CountingBody("ignored payload") + val resp = response(body = body) + val result: Unit = ResponseHandler.empty().handle(resp) + assertEquals(Unit, result) + assertEquals(1, body.closeCount.get()) + } + + @Test + fun `empty handler tolerates a bodyless response`() { + ResponseHandler.empty().handle(response(body = null)) + } + + // ---- ParsedResponse: raw access without parsing ---- + + @Test + fun `raw status headers and metadata are readable without invoking the handler`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + "v" + } + val parsed = ParsedResponse.of(response("body", status = Status.CREATED), handler) + + assertEquals(Status.CREATED, parsed.status) + assertEquals(Protocol.HTTP_1_1, parsed.protocol) + assertEquals("OK", parsed.message) + assertEquals(listOf("t-1"), parsed.headers.values("X-Trace")) + assertSame(parsed.raw.request, parsed.request) + // The handler must not have run merely from reading headers/status. + assertEquals(0, calls.get()) + } + + // ---- ParsedResponse: lazy + parse-once ---- + + @Test + fun `value is not parsed until first access`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + "parsed" + } + ParsedResponse.of(response(), handler) + assertEquals(0, calls.get()) + } + + @Test + fun `value parses exactly once across repeated access`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + "parsed" + } + val parsed = ParsedResponse.of(response(), handler) + + assertEquals("parsed", parsed.value()) + assertEquals("parsed", parsed.value()) + assertEquals("parsed", parsed.value()) + assertEquals(1, calls.get()) + } + + @Test + fun `value memoizes a null result without re-parsing`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + null + } + val parsed = ParsedResponse.of(response(), handler) + + assertNull(parsed.value()) + assertNull(parsed.value()) + assertEquals(1, calls.get()) + } + + @Test + fun `parse-once consumes the underlying body exactly once`() { + val body = CountingBody("once") + val parsed = ParsedResponse.of(response(body = body), ResponseHandler.string()) + + assertEquals("once", parsed.value()) + assertEquals("once", parsed.value()) + // The string handler closes the body when it parses; parse-once means only one close. + assertEquals(1, body.closeCount.get()) + } + + // ---- ParsedResponse: error semantics ---- + + @Test + fun `a handler failure is cached and rethrown without re-parsing`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + throw IOException("boom") + } + val parsed = ParsedResponse.of(response(), handler) + + val first = assertFailsWith { parsed.value() } + val second = assertFailsWith { parsed.value() } + assertEquals("boom", first.message) + assertSame(first, second) + assertEquals(1, calls.get()) + } + + // ---- ParsedResponse: Closeable ---- + + @Test + fun `close forwards to the raw response body`() { + val body = CountingBody("x") + val parsed = ParsedResponse.of(response(body = body), ResponseHandler.empty()) + parsed.close() + assertEquals(1, body.closeCount.get()) + } + + @Test + fun `parsedWith extension builds a ParsedResponse bound to the handler`() { + val parsed = response("ext").parsedWith(ResponseHandler.string()) + assertEquals("ext", parsed.value()) + } + + // ---- ParsedResponse: concurrency ---- + + @Test + fun `concurrent first access parses exactly once`() { + val calls = AtomicInteger(0) + val handler = + ResponseHandler { + calls.incrementAndGet() + Thread.sleep(20) + "parsed" + } + val parsed = ParsedResponse.of(response(), handler) + + val threads = 16 + val pool = Executors.newFixedThreadPool(threads) + val start = CountDownLatch(1) + val done = CountDownLatch(threads) + try { + repeat(threads) { + pool.execute { + start.await() + try { + assertEquals("parsed", parsed.value()) + } finally { + done.countDown() + } + } + } + start.countDown() + assertTrue(done.await(10, TimeUnit.SECONDS)) + } finally { + pool.shutdownNow() + } + assertEquals(1, calls.get()) + } +} diff --git a/sdk-serde-jackson/api/sdk-serde-jackson.api b/sdk-serde-jackson/api/sdk-serde-jackson.api index deeebbc3..f14795d4 100644 --- a/sdk-serde-jackson/api/sdk-serde-jackson.api +++ b/sdk-serde-jackson/api/sdk-serde-jackson.api @@ -21,6 +21,11 @@ public final class org/dexpace/sdk/serde/jackson/JacksonSerde$Companion { public final fun withDefaults ()Lorg/dexpace/sdk/serde/jackson/JacksonSerde; } +public final class org/dexpace/sdk/serde/jackson/JsonResponseHandlerKt { + public static final fun jsonHandler (Lorg/dexpace/sdk/core/serde/Serde;Ljava/lang/Class;)Lorg/dexpace/sdk/core/http/response/ResponseHandler; + public static final fun jsonHandler (Lorg/dexpace/sdk/serde/jackson/JacksonSerde;Lcom/fasterxml/jackson/core/type/TypeReference;)Lorg/dexpace/sdk/core/http/response/ResponseHandler; +} + public final class org/dexpace/sdk/serde/jackson/TristateModule : com/fasterxml/jackson/databind/module/SimpleModule { public static final field Companion Lorg/dexpace/sdk/serde/jackson/TristateModule$Companion; public fun ()V diff --git a/sdk-serde-jackson/build.gradle.kts b/sdk-serde-jackson/build.gradle.kts index 2974fffc..9d328e27 100644 --- a/sdk-serde-jackson/build.gradle.kts +++ b/sdk-serde-jackson/build.gradle.kts @@ -27,6 +27,9 @@ dependencies { testImplementation(kotlin("test")) testImplementation(libs.junit.jupiter) + // The response-handler tests build real response bodies over the I/O seam, so they need an + // IoProvider implementation (OkioIoProvider) on the test classpath. + testImplementation(project(":sdk-io-okio3")) testRuntimeOnly(libs.slf4j.nop) } diff --git a/sdk-serde-jackson/src/main/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandler.kt b/sdk-serde-jackson/src/main/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandler.kt new file mode 100644 index 00000000..4cf922a8 --- /dev/null +++ b/sdk-serde-jackson/src/main/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandler.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.serde.jackson + +import com.fasterxml.jackson.core.type.TypeReference +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseHandler +import org.dexpace.sdk.core.serde.Serde +import org.dexpace.sdk.core.serde.SerdeException + +/** + * A [ResponseHandler] that streams the response body through [serde]'s deserializer into a value + * of [type], then closes the response. + * + * Deserialization runs against the body's `InputStream`, so the payload is never first + * materialized as a `String` or `ByteArray`. The handler **consumes and closes the body**, so any + * raw header / status inspection must happen before the handler runs — pair it with a + * `ParsedResponse` to keep raw access available and to parse lazily exactly once. + * + * A missing body (e.g. a `204 No Content`) or any failure from the deserializer is surfaced as a + * [SerdeException], with the original codec exception preserved as the cause; the response is + * still closed on the failure path. + * + * This overload takes a raw [Class] token, which is sufficient for non-parametric targets. For + * parametric targets (`List`, `Map`) the raw class erases the element type — use + * the [TypeReference] overload backed by [JacksonSerde]. + * + * @param serde The serde whose deserializer decodes the body. + * @param type The non-parametric target type. + * @return A [ResponseHandler] that yields a value of [type]. + */ +public fun jsonHandler( + serde: Serde, + type: Class, +): ResponseHandler = + ResponseHandler { response -> + decode(response, type.typeName) { stream -> serde.deserializer.deserialize(stream, type) } + } + +/** + * A [ResponseHandler] that streams the response body through [serde] into the parametric type + * captured by [type], then closes the response. + * + * Use this overload for generic targets (`List`, `Map`) where a raw [Class] + * token would lose the element type. Behaves like the [Class] overload otherwise: it consumes and + * closes the body and surfaces deserialization failures (and a missing body) as a [SerdeException]. + * + * @param serde The Jackson serde whose mapper decodes the body. + * @param type The parametric target type. + * @return A [ResponseHandler] that yields a value of the captured type. + */ +public fun jsonHandler( + serde: JacksonSerde, + type: TypeReference, +): ResponseHandler = + ResponseHandler { response -> + decode(response, type.type.typeName) { stream -> serde.deserializeAs(stream, type) } + } + +/** + * Shared body-streaming + error-translation core for the [jsonHandler] overloads. Reads the + * response body as an `InputStream`, hands it to [decoder], and closes the response in all cases. + * A missing body or any decoder failure is translated into a [SerdeException]. + */ +private inline fun decode( + response: Response, + targetType: String, + decoder: (java.io.InputStream) -> T, +): T = + response.use { resp -> + val body = + resp.body + ?: throw SerdeException( + "Cannot deserialize a ${resp.status.code} response with no body into $targetType.", + ) + try { + decoder(body.source().inputStream()) + } catch (e: SerdeException) { + throw e + } catch (e: Exception) { + throw SerdeException("Failed to deserialize response body: ${e.message}", e) + } + } diff --git a/sdk-serde-jackson/src/test/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandlerTest.kt b/sdk-serde-jackson/src/test/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandlerTest.kt new file mode 100644 index 00000000..be9c404f --- /dev/null +++ b/sdk-serde-jackson/src/test/kotlin/org/dexpace/sdk/serde/jackson/JsonResponseHandlerTest.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.serde.jackson + +import com.fasterxml.jackson.core.type.TypeReference +import org.dexpace.sdk.core.http.common.MediaType +import org.dexpace.sdk.core.http.common.Protocol +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.ParsedResponse +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.http.response.ResponseBody +import org.dexpace.sdk.core.http.response.Status +import org.dexpace.sdk.core.io.BufferedSource +import org.dexpace.sdk.core.io.Io +import org.dexpace.sdk.core.serde.SerdeException +import org.dexpace.sdk.io.OkioIoProvider +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class JsonResponseHandlerTest { + data class Dto(val name: String, val score: Int) + + @BeforeTest + fun installProvider() { + Io.installProvider(OkioIoProvider) + } + + private fun jsonResponse(payload: String): Response { + val source = Io.provider.source(payload.toByteArray()) + val body = ResponseBody.create(source, MediaType.parse("application/json"), payload.length.toLong()) + return Response.builder() + .request(Request.builder().url("https://api.example.test/p").method(Method.GET).build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(body) + .build() + } + + // A body that tracks close() so consume-once can be asserted. + private class CountingBody(payload: String) : ResponseBody() { + val closeCount = AtomicInteger(0) + private val source = Io.provider.source(payload.toByteArray()) + + override fun mediaType(): MediaType = MediaType.parse("application/json") + + override fun contentLength(): Long = -1L + + override fun source(): BufferedSource = source + + override fun close() { + closeCount.incrementAndGet() + source.close() + } + } + + @Test + fun `jsonHandler deserializes a JSON body into the target type`() { + val serde = JacksonSerde.withDefaults() + val handler = jsonHandler(serde, Dto::class.java) + val dto = handler.handle(jsonResponse("""{"name":"a","score":7}""")) + assertEquals(Dto("a", 7), dto) + } + + @Test + fun `jsonHandler closes the response after parsing`() { + val serde = JacksonSerde.withDefaults() + val body = CountingBody("""{"name":"a","score":7}""") + val resp = + Response.builder() + .request(Request.builder().url("https://api.example.test/p").method(Method.GET).build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(body) + .build() + jsonHandler(serde, Dto::class.java).handle(resp) + assertEquals(1, body.closeCount.get()) + } + + @Test + fun `jsonHandler surfaces malformed JSON as a SerdeException`() { + val serde = JacksonSerde.withDefaults() + val handler = jsonHandler(serde, Dto::class.java) + val ex = assertFailsWith { handler.handle(jsonResponse("not json at all")) } + // The original Jackson failure is preserved as the cause. + assertEquals(true, ex.cause != null) + } + + @Test + fun `jsonHandler reports a missing body as a SerdeException`() { + val serde = JacksonSerde.withDefaults() + val resp = + Response.builder() + .request(Request.builder().url("https://api.example.test/p").method(Method.GET).build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.NO_CONTENT) + .build() + val ex = assertFailsWith { jsonHandler(serde, Dto::class.java).handle(resp) } + // The message names the target type so a failure log says what the decode was aiming for. + assertEquals(true, ex.message?.contains(Dto::class.java.typeName) == true) + } + + @Test + fun `jsonHandler with a TypeReference parses parametric types`() { + val serde = JacksonSerde.withDefaults() + val handler = jsonHandler(serde, object : TypeReference>() {}) + val list = handler.handle(jsonResponse("""[{"name":"a","score":1},{"name":"b","score":2}]""")) + assertEquals(listOf(Dto("a", 1), Dto("b", 2)), list) + } + + @Test + fun `jsonHandler composes with ParsedResponse for lazy parse-once`() { + val serde = JacksonSerde.withDefaults() + val body = CountingBody("""{"name":"a","score":7}""") + val resp = + Response.builder() + .request(Request.builder().url("https://api.example.test/p").method(Method.GET).build()) + .protocol(Protocol.HTTP_1_1) + .status(Status.OK) + .body(body) + .build() + val parsed = ParsedResponse.of(resp, jsonHandler(serde, Dto::class.java)) + + // Header access does not parse / consume. + assertEquals(Status.OK, parsed.status) + assertEquals(0, body.closeCount.get()) + + assertEquals(Dto("a", 7), parsed.value()) + assertEquals(Dto("a", 7), parsed.value()) + assertEquals(1, body.closeCount.get()) + } +}