diff --git a/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransport.kt b/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransport.kt index 283ff902..a80a4b0e 100644 --- a/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransport.kt +++ b/sdk-transport-jdkhttp/src/main/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransport.kt @@ -138,21 +138,41 @@ public class JdkHttpTransport private constructor( * completions to release the body's connection back to the pool. */ override fun executeAsync(request: Request): CompletableFuture { - val jdkRequest = - try { - requestAdapter.adapt(request, responseTimeout) - } catch (e: Exception) { - // The async contract is that errors arrive through the returned future. Request - // adaptation runs on the caller's thread and can throw (e.g. a CONNECT request the - // JDK client rejects), so route the failure into a failed future instead of - // throwing synchronously where a future-composing caller's .exceptionally/.handle - // would never observe it. Errors (OOM and other JVM-fatal conditions) are left to - // propagate up the caller's stack rather than be packaged into a future that may - // never be awaited. - return CompletableFuture.failedFuture(e) - } - val inFlight = client.sendAsync(jdkRequest, HttpResponse.BodyHandlers.ofInputStream()) - return bridgeAsyncResponse(inFlight) { jdkResponse -> responseAdapter.adapt(request, jdkResponse) } + // Held outside the try so the catch can release the JDK exchange if dispatch succeeded but a + // later step threw (see the catch). Null until `sendAsync` returns: a throw at or before + // dispatch leaves nothing to clean up. + var inFlight: CompletableFuture>? = null + return try { + val jdkRequest = requestAdapter.adapt(request, responseTimeout) + // `sendAsync` is inside the guard too. Its contract does not promise that every failure + // is delivered through the returned future: the JDK's own Javadoc permits a synchronous + // `IllegalArgumentException` for a request it rejects, and a custom or future + // `HttpClient` is free to throw on the caller's thread instead of returning a failed + // future. Guarding the dispatch keeps the error-delivery contract documented above + // intact whichever way a failure surfaces. (Today's stock JDK client packages such + // failures into an already-failed future — e.g. on a closed client — which the bridge + // propagates and so never reaches this catch; the guard is for the throwing case.) + inFlight = client.sendAsync(jdkRequest, HttpResponse.BodyHandlers.ofInputStream()) + bridgeAsyncResponse(inFlight) { jdkResponse -> responseAdapter.adapt(request, jdkResponse) } + } catch (e: Exception) { + // The async contract is that errors arrive through the returned future. The dispatch + // path above runs on the caller's thread and can throw — request adaptation rejecting a + // CONNECT request, a synchronous `sendAsync` rejection, or an unexpected adapter bug + // such as an NPE — so route any of these into a failed future instead of throwing + // synchronously where a future-composing caller's .exceptionally/.handle would never + // observe it. The breadth is intentional: catching `Exception` (not `RuntimeException`) + // keeps a future adapter step's checked exception on the future too. Only `Error` (OOM + // and other JVM-fatal conditions) is left to propagate up the caller's stack rather + // than be packaged into a future that may never be awaited. + // + // If `sendAsync` already returned an in-flight exchange, the only way control reaches + // here is `bridgeAsyncResponse` throwing before it wired that future's cancellation + // propagation — its result is never returned, so cancel the exchange directly to release + // its connection rather than leak it on a future nothing will await. The cancel is a + // no-op when `inFlight` is null (the throw happened at or before dispatch). + inFlight?.cancel(true) + CompletableFuture.failedFuture(e) + } } /** diff --git a/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt b/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt index 3e5ec734..b78be3d0 100644 --- a/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt +++ b/sdk-transport-jdkhttp/src/test/kotlin/org/dexpace/sdk/transport/jdkhttp/JdkHttpTransportTest.kt @@ -24,22 +24,31 @@ import org.dexpace.sdk.io.OkioIoProvider import org.dexpace.sdk.transport.jdkhttp.internal.BodyPublishers import java.io.ByteArrayOutputStream import java.io.IOException +import java.net.Authenticator +import java.net.CookieHandler import java.net.InetSocketAddress +import java.net.ProxySelector import java.net.ServerSocket import java.net.URL import java.net.http.HttpClient import java.net.http.HttpRequest +import java.net.http.HttpResponse import java.nio.ByteBuffer import java.security.MessageDigest import java.time.Duration +import java.util.Optional import java.util.concurrent.CancellationException +import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionException import java.util.concurrent.CountDownLatch import java.util.concurrent.ExecutionException +import java.util.concurrent.Executor import java.util.concurrent.Flow import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLParameters import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertContentEquals @@ -162,11 +171,53 @@ class JdkHttpTransportTest { .build() // Must return a future rather than throwing on the caller's thread. val future = transport.executeAsync(request) + // Completion is synchronous, not merely eventual: the future is already completed + // exceptionally on return, before anything is awaited. + assertTrue( + future.isCompletedExceptionally, + "adaptation failure must complete the future exceptionally synchronously on return", + ) val ex = assertFailsWith { future.get(5, TimeUnit.SECONDS) } assertTrue( ex.cause is IllegalArgumentException, "adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}", ) + // Assert the message so an unrelated IllegalArgumentException cannot satisfy the test. + // RequestAdapter rejects CONNECT with "java.net.http.HttpClient does not support + // user-issued CONNECT requests." + assertTrue( + ex.cause?.message?.contains("does not support user-issued CONNECT requests") == true, + "expected the JDK CONNECT-rejection message, was: ${ex.cause?.message}", + ) + } + + @Test + fun `executeAsyncRoutesSynchronousDispatchThrowThroughFuture`() { + // `sendAsync`'s contract does not promise that every failure is delivered through the + // returned future — a client may throw synchronously on the caller's thread. The dispatch + // path runs after a successful adaptation, so this injects a client whose `sendAsync` + // throws to exercise the post-adapt guard deterministically on every JDK. (The stock JDK + // client instead returns an already-failed future for, e.g., a closed client; the bridge + // propagates that and it never reaches the guard, so a real closed client cannot prove + // this path. The test double does.) The failure must arrive through the returned future, + // never escape executeAsync on the caller's thread. + val boom = IllegalStateException("synchronous dispatch failure") + val throwingTransport = JdkHttpTransport.create(SyncThrowingDispatchClient(boom)) + + // Must return a future rather than throwing on the caller's thread. + val future = throwingTransport.executeAsync(simpleGet("/async-dispatch-throw")) + // Completion is synchronous, not merely eventual: the future is already completed + // exceptionally on return, before anything is awaited. + assertTrue( + future.isCompletedExceptionally, + "a synchronous dispatch throw must complete the future exceptionally synchronously on return", + ) + val ex = assertFailsWith { future.get(5, TimeUnit.SECONDS) } + assertEquals( + boom, + ex.cause, + "the dispatch throw must surface verbatim as the future's cause, was: ${ex.cause}", + ) } // -------- headers round-trip -------- @@ -832,4 +883,52 @@ class JdkHttpTransportTest { override fun canHandle(challenges: List): Boolean = false } + + /** + * A minimal [HttpClient] whose async dispatch throws synchronously on the caller's thread, + * modelling a client (or a future JDK) that does not package every `sendAsync` failure into the + * returned future. Only [sendAsync] is exercised by [JdkHttpTransport.executeAsync]; the rest + * are inert stubs that are never invoked on the dispatch path under test. In particular + * [sslContext] returns `SSLContext.getDefault()` only to satisfy the abstract member — its + * checked `NoSuchAlgorithmException` and the cost of materialising the JVM default SSL context + * never apply here because the path under test never reads the context; likewise [send] throws + * rather than returning a stub response, as the synchronous path is never reached. + */ + private class SyncThrowingDispatchClient( + private val failure: RuntimeException, + ) : HttpClient() { + override fun cookieHandler(): Optional = Optional.empty() + + override fun connectTimeout(): Optional = Optional.empty() + + override fun followRedirects(): HttpClient.Redirect = HttpClient.Redirect.NEVER + + override fun proxy(): Optional = Optional.empty() + + override fun sslContext(): SSLContext = SSLContext.getDefault() + + override fun sslParameters(): SSLParameters = SSLParameters() + + override fun authenticator(): Optional = Optional.empty() + + override fun version(): HttpClient.Version = HttpClient.Version.HTTP_1_1 + + override fun executor(): Optional = Optional.empty() + + override fun send( + request: HttpRequest, + responseBodyHandler: HttpResponse.BodyHandler, + ): HttpResponse = throw UnsupportedOperationException("synchronous send is not used by this test double") + + override fun sendAsync( + request: HttpRequest, + responseBodyHandler: HttpResponse.BodyHandler, + ): CompletableFuture> = throw failure + + override fun sendAsync( + request: HttpRequest, + responseBodyHandler: HttpResponse.BodyHandler, + pushPromiseHandler: HttpResponse.PushPromiseHandler?, + ): CompletableFuture> = throw failure + } } diff --git a/sdk-transport-okhttp/src/main/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransport.kt b/sdk-transport-okhttp/src/main/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransport.kt index d43c6373..61223827 100644 --- a/sdk-transport-okhttp/src/main/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransport.kt +++ b/sdk-transport-okhttp/src/main/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransport.kt @@ -116,11 +116,16 @@ public class OkHttpTransport private constructor( // adaptation runs on the caller's thread and can throw (e.g. a method/body // mismatch OkHttp rejects), so route the failure into a completed-exceptionally // future instead of throwing synchronously where a future-composing caller's - // .exceptionally/.handle would never observe it. Errors (OOM and other JVM-fatal - // conditions) are left to propagate up the caller's stack rather than be packaged - // into a future that may never be awaited. + // .exceptionally/.handle would never observe it. The breadth is intentional: + // catching `Exception` (not `RuntimeException`) also funnels an unexpected adapter + // bug such as an NPE through the future so the caller can observe it. Only `Error` + // (OOM and other JVM-fatal conditions) is left to propagate up the caller's stack + // rather than be packaged into a future that may never be awaited. return failedFuture(e) } + // The post-adaptation dispatch needs no guard: `newCall(...)` does no throwing work, and a + // dispatch failure (including a `RejectedExecutionException` from a shut-down dispatcher) + // is delivered through `Callback.onFailure` below, so it already reaches the future. val call = client.newCall(okRequest) val future = CompletableFuture() call.enqueue( diff --git a/sdk-transport-okhttp/src/test/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransportTest.kt b/sdk-transport-okhttp/src/test/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransportTest.kt index c4cc9dbb..77b88bd3 100644 --- a/sdk-transport-okhttp/src/test/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransportTest.kt +++ b/sdk-transport-okhttp/src/test/kotlin/org/dexpace/sdk/transport/okhttp/OkHttpTransportTest.kt @@ -207,11 +207,24 @@ class OkHttpTransportTest { .build() // Must return a future rather than throwing on the caller's thread. val future = transport.executeAsync(request) + // Completion is synchronous, not merely eventual: the future is already completed + // exceptionally on return, before anything is awaited. + assertTrue( + future.isCompletedExceptionally, + "adaptation failure must complete the future exceptionally synchronously on return", + ) val ex = assertFailsWith { future.get(5, TimeUnit.SECONDS) } assertTrue( ex.cause is IllegalArgumentException, "adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}", ) + // Assert the message so an unrelated IllegalArgumentException cannot satisfy the test. + // OkHttp's Request.Builder.method rejects a body on GET with " must not have a + // request body." + assertTrue( + ex.cause?.message?.contains("must not have a request body") == true, + "expected OkHttp's body-on-GET rejection message, was: ${ex.cause?.message}", + ) } // -------- headers round-trip --------