Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,41 @@ public class JdkHttpTransport private constructor(
* completions to release the body's connection back to the pool.
*/
override fun executeAsync(request: Request): CompletableFuture<Response> {
val jdkRequest =
try {
requestAdapter.adapt(request, responseTimeout)
} catch (e: Exception) {
// The async contract is that errors arrive through the returned future. Request
// adaptation runs on the caller's thread and can throw (e.g. a CONNECT request the
// JDK client rejects), so route the failure into a failed future instead of
// throwing synchronously where a future-composing caller's .exceptionally/.handle
// would never observe it. Errors (OOM and other JVM-fatal conditions) are left to
// propagate up the caller's stack rather than be packaged into a future that may
// never be awaited.
return CompletableFuture.failedFuture<Response>(e)
}
val inFlight = client.sendAsync(jdkRequest, HttpResponse.BodyHandlers.ofInputStream())
return bridgeAsyncResponse(inFlight) { jdkResponse -> responseAdapter.adapt(request, jdkResponse) }
// Held outside the try so the catch can release the JDK exchange if dispatch succeeded but a
// later step threw (see the catch). Null until `sendAsync` returns: a throw at or before
// dispatch leaves nothing to clean up.
var inFlight: CompletableFuture<HttpResponse<InputStream>>? = null
return try {
val jdkRequest = requestAdapter.adapt(request, responseTimeout)
// `sendAsync` is inside the guard too. Its contract does not promise that every failure
// is delivered through the returned future: the JDK's own Javadoc permits a synchronous
// `IllegalArgumentException` for a request it rejects, and a custom or future
// `HttpClient` is free to throw on the caller's thread instead of returning a failed
// future. Guarding the dispatch keeps the error-delivery contract documented above
// intact whichever way a failure surfaces. (Today's stock JDK client packages such
// failures into an already-failed future — e.g. on a closed client — which the bridge
// propagates and so never reaches this catch; the guard is for the throwing case.)
inFlight = client.sendAsync(jdkRequest, HttpResponse.BodyHandlers.ofInputStream())
bridgeAsyncResponse(inFlight) { jdkResponse -> responseAdapter.adapt(request, jdkResponse) }
} catch (e: Exception) {
// The async contract is that errors arrive through the returned future. The dispatch
// path above runs on the caller's thread and can throw — request adaptation rejecting a
// CONNECT request, a synchronous `sendAsync` rejection, or an unexpected adapter bug
// such as an NPE — so route any of these into a failed future instead of throwing
// synchronously where a future-composing caller's .exceptionally/.handle would never
// observe it. The breadth is intentional: catching `Exception` (not `RuntimeException`)
// keeps a future adapter step's checked exception on the future too. Only `Error` (OOM
// and other JVM-fatal conditions) is left to propagate up the caller's stack rather
// than be packaged into a future that may never be awaited.
//
// If `sendAsync` already returned an in-flight exchange, the only way control reaches
// here is `bridgeAsyncResponse` throwing before it wired that future's cancellation
// propagation — its result is never returned, so cancel the exchange directly to release
// its connection rather than leak it on a future nothing will await. The cancel is a
// no-op when `inFlight` is null (the throw happened at or before dispatch).
inFlight?.cancel(true)
CompletableFuture.failedFuture<Response>(e)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,31 @@ import org.dexpace.sdk.io.OkioIoProvider
import org.dexpace.sdk.transport.jdkhttp.internal.BodyPublishers
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.net.Authenticator
import java.net.CookieHandler
import java.net.InetSocketAddress
import java.net.ProxySelector
import java.net.ServerSocket
import java.net.URL
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.time.Duration
import java.util.Optional
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Flow
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLParameters
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertContentEquals
Expand Down Expand Up @@ -162,11 +171,53 @@ class JdkHttpTransportTest {
.build()
// Must return a future rather than throwing on the caller's thread.
val future = transport.executeAsync(request)
// Completion is synchronous, not merely eventual: the future is already completed
// exceptionally on return, before anything is awaited.
assertTrue(
future.isCompletedExceptionally,
"adaptation failure must complete the future exceptionally synchronously on return",
)
val ex = assertFailsWith<ExecutionException> { future.get(5, TimeUnit.SECONDS) }
assertTrue(
ex.cause is IllegalArgumentException,
"adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}",
)
// Assert the message so an unrelated IllegalArgumentException cannot satisfy the test.
// RequestAdapter rejects CONNECT with "java.net.http.HttpClient does not support
// user-issued CONNECT requests."
assertTrue(
ex.cause?.message?.contains("does not support user-issued CONNECT requests") == true,
"expected the JDK CONNECT-rejection message, was: ${ex.cause?.message}",
)
}

@Test
fun `executeAsyncRoutesSynchronousDispatchThrowThroughFuture`() {
// `sendAsync`'s contract does not promise that every failure is delivered through the
// returned future — a client may throw synchronously on the caller's thread. The dispatch
// path runs after a successful adaptation, so this injects a client whose `sendAsync`
// throws to exercise the post-adapt guard deterministically on every JDK. (The stock JDK
// client instead returns an already-failed future for, e.g., a closed client; the bridge
// propagates that and it never reaches the guard, so a real closed client cannot prove
// this path. The test double does.) The failure must arrive through the returned future,
// never escape executeAsync on the caller's thread.
val boom = IllegalStateException("synchronous dispatch failure")
val throwingTransport = JdkHttpTransport.create(SyncThrowingDispatchClient(boom))

// Must return a future rather than throwing on the caller's thread.
val future = throwingTransport.executeAsync(simpleGet("/async-dispatch-throw"))
// Completion is synchronous, not merely eventual: the future is already completed
// exceptionally on return, before anything is awaited.
assertTrue(
future.isCompletedExceptionally,
"a synchronous dispatch throw must complete the future exceptionally synchronously on return",
)
val ex = assertFailsWith<ExecutionException> { future.get(5, TimeUnit.SECONDS) }
assertEquals(
boom,
ex.cause,
"the dispatch throw must surface verbatim as the future's cause, was: ${ex.cause}",
)
}

// -------- headers round-trip --------
Expand Down Expand Up @@ -832,4 +883,52 @@ class JdkHttpTransportTest {

override fun canHandle(challenges: List<org.dexpace.sdk.core.http.auth.AuthenticateChallenge>): Boolean = false
}

/**
* A minimal [HttpClient] whose async dispatch throws synchronously on the caller's thread,
* modelling a client (or a future JDK) that does not package every `sendAsync` failure into the
* returned future. Only [sendAsync] is exercised by [JdkHttpTransport.executeAsync]; the rest
* are inert stubs that are never invoked on the dispatch path under test. In particular
* [sslContext] returns `SSLContext.getDefault()` only to satisfy the abstract member — its
* checked `NoSuchAlgorithmException` and the cost of materialising the JVM default SSL context
* never apply here because the path under test never reads the context; likewise [send] throws
* rather than returning a stub response, as the synchronous path is never reached.
*/
private class SyncThrowingDispatchClient(
private val failure: RuntimeException,
) : HttpClient() {
override fun cookieHandler(): Optional<CookieHandler> = Optional.empty()

override fun connectTimeout(): Optional<Duration> = Optional.empty()

override fun followRedirects(): HttpClient.Redirect = HttpClient.Redirect.NEVER

override fun proxy(): Optional<ProxySelector> = Optional.empty()

override fun sslContext(): SSLContext = SSLContext.getDefault()

override fun sslParameters(): SSLParameters = SSLParameters()

override fun authenticator(): Optional<Authenticator> = Optional.empty()

override fun version(): HttpClient.Version = HttpClient.Version.HTTP_1_1

override fun executor(): Optional<Executor> = Optional.empty()

override fun <T> send(
request: HttpRequest,
responseBodyHandler: HttpResponse.BodyHandler<T>,
): HttpResponse<T> = throw UnsupportedOperationException("synchronous send is not used by this test double")

override fun <T> sendAsync(
request: HttpRequest,
responseBodyHandler: HttpResponse.BodyHandler<T>,
): CompletableFuture<HttpResponse<T>> = throw failure

override fun <T> sendAsync(
request: HttpRequest,
responseBodyHandler: HttpResponse.BodyHandler<T>,
pushPromiseHandler: HttpResponse.PushPromiseHandler<T>?,
): CompletableFuture<HttpResponse<T>> = throw failure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,16 @@ public class OkHttpTransport private constructor(
// adaptation runs on the caller's thread and can throw (e.g. a method/body
// mismatch OkHttp rejects), so route the failure into a completed-exceptionally
// future instead of throwing synchronously where a future-composing caller's
// .exceptionally/.handle would never observe it. Errors (OOM and other JVM-fatal
// conditions) are left to propagate up the caller's stack rather than be packaged
// into a future that may never be awaited.
// .exceptionally/.handle would never observe it. The breadth is intentional:
// catching `Exception` (not `RuntimeException`) also funnels an unexpected adapter
// bug such as an NPE through the future so the caller can observe it. Only `Error`
// (OOM and other JVM-fatal conditions) is left to propagate up the caller's stack
// rather than be packaged into a future that may never be awaited.
return failedFuture(e)
}
// The post-adaptation dispatch needs no guard: `newCall(...)` does no throwing work, and a
// dispatch failure (including a `RejectedExecutionException` from a shut-down dispatcher)
// is delivered through `Callback.onFailure` below, so it already reaches the future.
val call = client.newCall(okRequest)
val future = CompletableFuture<Response>()
call.enqueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,24 @@ class OkHttpTransportTest {
.build()
// Must return a future rather than throwing on the caller's thread.
val future = transport.executeAsync(request)
// Completion is synchronous, not merely eventual: the future is already completed
// exceptionally on return, before anything is awaited.
assertTrue(
future.isCompletedExceptionally,
"adaptation failure must complete the future exceptionally synchronously on return",
)
val ex = assertFailsWith<ExecutionException> { future.get(5, TimeUnit.SECONDS) }
assertTrue(
ex.cause is IllegalArgumentException,
"adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}",
)
// Assert the message so an unrelated IllegalArgumentException cannot satisfy the test.
// OkHttp's Request.Builder.method rejects a body on GET with "<method> must not have a
// request body."
assertTrue(
ex.cause?.message?.contains("must not have a request body") == true,
"expected OkHttp's body-on-GET rejection message, was: ${ex.cause?.message}",
)
}

// -------- headers round-trip --------
Expand Down
Loading