Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ Layered, from the bottom up:

- **Java 8 bytecode everywhere except** `sdk-transport-jdkhttp` (11) and `sdk-async-virtualthreads` (21).
Avoid `InputStream.transferTo` (9+), `Thread.threadId()` (19+), records, sealed `permits` in Java-8
modules. A module that genuinely needs a newer JDK must override **both** `jvmToolchain(N)` **and**
modules. A module that genuinely needs a newer JDK must override **all three** of `jvmToolchain(N)`,
the `java { sourceCompatibility / targetCompatibility = VERSION_N + toolchain }` block, and
`compilerOptions { jvmTarget.set(JvmTarget.JVM_N) }` in its own build script — overriding only the
toolchain produces Java-8-format bytecode referencing newer stdlib symbols (`NoSuchMethodError` on JDK 8).
toolchain produces Java-8-format bytecode referencing newer stdlib symbols (`NoSuchMethodError` on JDK 8),
and omitting the `java {}` block trips Gradle's `compileJava`/`compileKotlin` JVM-target validation. See
`docs/architecture.md` (Cross-Compile Toolchain Discipline).
- **MIT license header in every source file.** Each `.kt`, `.java`, and `.kts` file starts with the 6-line
`Copyright (c) 2026 dexpace and Omar Aljarrah` / `SPDX-License-Identifier: MIT` block — copy it from any
existing file when creating new ones. Nothing enforces this automatically; it is a review convention.
Expand Down
53 changes: 53 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ concerns.
- [Cross-Cutting Design Decisions](#cross-cutting-design-decisions)
- [Zero Dependencies](#zero-dependencies)
- [JDK 8 Compatibility](#jdk-8-compatibility)
- [Cross-Compile Toolchain Discipline](#cross-compile-toolchain-discipline)
- [Immutability and Builders](#immutability-and-builders)
- [Virtual Thread Safety](#virtual-thread-safety)
- [Internal Visibility](#internal-visibility)
Expand Down Expand Up @@ -505,6 +506,58 @@ All code targets Java 8 bytecode (`jvmTarget = "1.8"`). Specific implications:
- `ReentrantLock` (Java 5+) replaces `synchronized` for future-proofing with virtual threads
- No `java.net.http.HttpClient` (Java 11+); the `HttpClient` interface is transport-agnostic

### Cross-Compile Toolchain Discipline

Most modules compile against Java 8 bytecode, but two need a newer JDK: `sdk-transport-jdkhttp`
targets 11 (`java.net.http.HttpClient` was finalised in JEP 321) and `sdk-async-virtualthreads`
targets 21 (virtual threads). Each of those modules raises its target by overriding **three**
things in its own build script:

```kotlin
kotlin {
jvmToolchain(21) // which JDK compiles the module
}

java {
sourceCompatibility = JavaVersion.VERSION_21 // Java-source level
targetCompatibility = JavaVersion.VERSION_21 // bytecode version `compileJava` emits
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
}

tasks.withType<KotlinCompile>().configureEach {
compilerOptions {
jvmTarget.set(JvmTarget.JVM_21) // bytecode version `compileKotlin` emits
}
}
```

(`sdk-transport-jdkhttp` does the same with `11`/`VERSION_11`/`JVM_11`.) **All three** overrides
are mandatory. The `java {}` block governs `compileJava` and keeps Gradle's JVM-target validation
between `compileJava` and `compileKotlin` happy; a module that sets only the Kotlin toolchain and
`jvmTarget` but omits the `java {}` block will trip that validation or compile its Java sources at
the wrong level.

The root build registers a `plugins.withId("org.jetbrains.kotlin.jvm")` callback that sets
`jvmTarget` to `JVM_1_8` for every Kotlin module by default. A module that bumps only the
toolchain — say to JDK 21 — but leaves `jvmTarget` at the inherited `1.8` will compile *against*
the JDK 21 standard library while *emitting* Java-8-format class files. The result links fine on
the build machine but references methods that do not exist on a Java 8 runtime, so a downstream
Java 8 consumer fails at call time with `NoSuchMethodError`. Setting `jvmTarget` to match the
toolchain makes the Kotlin compiler reject newer-than-target stdlib references at compile time
instead, turning that runtime failure into a build error.

This per-module override is the current, deliberately safe arrangement. The discipline matters
under a hypothetical future consolidation onto a single newer toolchain (for build speed, or to
sidestep the detekt-1.23.x crash on JDK 25+). If every module were compiled by, say, JDK 17 while
the Java-8-target modules kept `jvmTarget = JVM_1_8`, those modules would again be compiling
against a newer stdlib than they emit bytecode for. Guarding that arrangement requires a
`--release 8` / `-Xjdk-release=8` flag on the Java-8-target modules so the compiler bounds the
*visible* API to Java 8, not just the bytecode version. As long as each module that needs a newer
runtime carries its own matched `jvmToolchain` + `jvmTarget` pair, no `--release` guard is needed;
adopt one only if the toolchain is ever unified.

### Immutability and Builders

All HTTP model classes follow the same pattern:
Expand Down
5 changes: 5 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
kotlin.code.style=official

# Reuse task outputs across builds and run independent module tasks concurrently to cut build time.
# (org.gradle.configuration-cache is intentionally left out and tracked separately.)
org.gradle.caching=true
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertTrue

// Failsafe deadline for awaiting an operation that is expected to complete promptly. It exists only
// to stop a genuinely-stuck test from hanging forever, so it is kept generous: a healthy test
// returns the instant the awaited work finishes and never approaches this bound, even on a loaded
// CI host running modules in parallel.
private const val FAILSAFE_TIMEOUT_SECONDS = 30L
private const val FAILSAFE_TIMEOUT_MILLIS = FAILSAFE_TIMEOUT_SECONDS * 1000L

class CoroutinesTest {
@Test
fun `suspend execute awaits the underlying future`() =
Expand Down Expand Up @@ -105,7 +112,7 @@ class CoroutinesTest {
job.cancel()
// `cancelled` is completed by the future's listener after kotlinx-coroutines-jdk8's
// `await()` calls `cancel(true)` on the underlying CompletableFuture.
cancelled.get(2, TimeUnit.SECONDS)
cancelled.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
scope.cancel()
}

Expand All @@ -121,7 +128,7 @@ class CoroutinesTest {
delay(5)
42
}
assertEquals(42, future.get(2, TimeUnit.SECONDS))
assertEquals(42, future.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS))
} finally {
scope.cancel()
}
Expand All @@ -134,7 +141,7 @@ class CoroutinesTest {
try {
val syncClient = HttpClient { request -> mockResponse(request, 201) }
val asyncClient = syncClient.asAsyncCoroutines(scope)
val response = withTimeout(2000) { asyncClient.execute(getRequest()) }
val response = withTimeout(FAILSAFE_TIMEOUT_MILLIS) { asyncClient.execute(getRequest()) }
assertEquals(201, response.status.code)
} finally {
scope.cancel()
Expand Down Expand Up @@ -171,7 +178,7 @@ class CoroutinesTest {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
try {
val async = sync.asAsyncCoroutines(scope)
async.executeAsync(getRequest()).get(2, TimeUnit.SECONDS)
async.executeAsync(getRequest()).get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertEquals("coroutines-test", seenTraceId.get())
} finally {
scope.cancel()
Expand All @@ -195,7 +202,7 @@ class CoroutinesTest {
delay(5)
MDC.get("trace.id") ?: "<missing>"
}
assertEquals("cf-of-test", future.get(2, TimeUnit.SECONDS))
assertEquals("cf-of-test", future.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS))
} finally {
scope.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

// Failsafe deadline for awaiting an operation that is expected to complete promptly. It exists only
// to stop a genuinely-stuck test from hanging forever, so it is kept generous: a healthy test
// returns the instant the awaited work finishes and never approaches this bound, even on a loaded
// CI host running modules in parallel.
private const val FAILSAFE_TIMEOUT_SECONDS = 30L

class NettyTest {
private val executor = DefaultEventExecutor()

@AfterTest
fun shutdown() {
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).await(2, TimeUnit.SECONDS)
executor.shutdownGracefully(0, 0, TimeUnit.SECONDS).await(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}

@Test
Expand All @@ -43,7 +49,7 @@ class NettyTest {
CompletableFuture.completedFuture(mockResponse(request, 200))
}
val future = client.executeNetty(getRequest(), executor)
assertTrue(future.await(2, TimeUnit.SECONDS))
assertTrue(future.await(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS))
assertTrue(future.isSuccess)
assertEquals(200, future.now.status.code)
}
Expand All @@ -56,7 +62,7 @@ class NettyTest {
CompletableFuture<Response>().apply { completeExceptionally(sentinel) }
}
val future = client.executeNetty(getRequest(), executor)
assertTrue(future.await(2, TimeUnit.SECONDS))
assertTrue(future.await(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS))
assertEquals(false, future.isSuccess)
// Netty's `cause()` returns the failure passed to `setFailure(...)` — should be the
// original IOException, not a CompletionException wrapper.
Expand All @@ -70,7 +76,7 @@ class NettyTest {
AsyncHttpClient { request -> CompletableFuture.completedFuture(mockResponse(request, 201)) },
).build()
val future = pipeline.sendNetty(getRequest(), executor)
assertTrue(future.await(2, TimeUnit.SECONDS))
assertTrue(future.await(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS))
assertEquals(201, future.now.status.code)
}

Expand All @@ -86,7 +92,7 @@ class NettyTest {
// Cancel via Netty's API.
nettyFuture.cancel(true)
// Wait deterministically for the cancel to propagate to the source future.
cancelLatch.get(2, TimeUnit.SECONDS)
cancelLatch.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertTrue(sourceFuture.isCancelled, "cancelling the Netty promise should cancel the source CompletableFuture")
}

Expand Down Expand Up @@ -117,7 +123,7 @@ class NettyTest {
it.start()
}

completionLatch.get(2, TimeUnit.SECONDS)
completionLatch.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
val err = completionErrorRef.get()
if (err != null) throw AssertionError("trySuccess threw unexpectedly after cancel: $err", err)
// Promise remains cancelled.
Expand All @@ -137,7 +143,7 @@ class NettyTest {
CompletableFuture.completedFuture(mockResponse(request, 200))
}
val nettyFuture = async.executeNetty(getRequest(), executor)
nettyFuture.get(2, TimeUnit.SECONDS)
nettyFuture.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertEquals("netty-transport-test", seenTraceId.get())
} finally {
MDC.clear()
Expand All @@ -158,7 +164,7 @@ class NettyTest {
val f = CompletableFuture<Response>()
// Complete on a separate thread to ensure whenComplete fires off-caller-thread.
Thread {
gate.get(2, TimeUnit.SECONDS)
gate.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
f.complete(mockResponse(request, 200))
}.also {
it.isDaemon = true
Expand All @@ -174,7 +180,7 @@ class NettyTest {
mdcLatch.complete(Unit)
}
gate.complete(Unit)
mdcLatch.get(2, TimeUnit.SECONDS)
mdcLatch.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertEquals("netty-whencomplete-mdc", seenTraceId.get())
} finally {
MDC.clear()
Expand All @@ -193,7 +199,7 @@ class NettyTest {
CompletableFuture.completedFuture(mockResponse(request, 200))
}
async.executeNetty(getRequest(), executor)
.get(2, TimeUnit.SECONDS)
.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertEquals("netty-caller-preserve", MDC.get("trace.id"))
} finally {
MDC.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

// Failsafe deadline for awaiting an operation that is expected to complete promptly. It exists only
// to stop a genuinely-stuck test from blocking forever, so it is kept generous: a healthy test
// returns the instant the awaited work finishes and never approaches this bound, even on a loaded
// CI host running modules in parallel.
private const val FAILSAFE_TIMEOUT_SECONDS = 30L
private val FAILSAFE_TIMEOUT: Duration = Duration.ofSeconds(FAILSAFE_TIMEOUT_SECONDS)

class ReactorTest {
@BeforeTest
fun installIo() {
Expand Down Expand Up @@ -134,7 +141,7 @@ class ReactorTest {
seenTraceId.set(MDC.get("trace.id")?.hashCode() ?: -1)
CompletableFuture.completedFuture(mockResponse(request, 200))
}
client.executeMono(getRequest()).block(java.time.Duration.ofSeconds(2))
client.executeMono(getRequest()).block(FAILSAFE_TIMEOUT)
// The supplier is called synchronously on the subscribe thread, so MDC is already present.
// We just verify that the supplier sees it (baseline regression).
assertTrue(seenTraceId.get() != 0, "Supplier should see the trace.id")
Expand All @@ -154,7 +161,7 @@ class ReactorTest {
AsyncHttpClient { request ->
CompletableFuture.completedFuture(mockResponse(request, 200))
}
client.executeMono(getRequest()).block(java.time.Duration.ofSeconds(2))
client.executeMono(getRequest()).block(FAILSAFE_TIMEOUT)
// After block() returns, the caller's MDC should still be intact — withMdc inside the
// adapter's hooks restores the previous (= caller's) MDC on exit.
assertEquals("reactor-caller-preserve", MDC.get("trace.id"))
Expand All @@ -177,9 +184,9 @@ class ReactorTest {
// StepVerifier creates subscription then cancels it.
StepVerifier.create(mono)
.thenCancel()
.verify(Duration.ofSeconds(2))
.verify(FAILSAFE_TIMEOUT)
// Retrieve the underlying future that was created during subscription.
val underlying = futureLatch.get(2, TimeUnit.SECONDS)
val underlying = futureLatch.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertTrue(
underlying.isCancelled,
"Disposing the Mono subscription should cancel the underlying CompletableFuture",
Expand All @@ -202,7 +209,7 @@ class ReactorTest {
// Reactor scheduler thread — the mdc.withMdc { ... } wrap must re-apply MDC there.
client.executeMono(getRequest())
.subscribeOn(Schedulers.boundedElastic())
.block(Duration.ofSeconds(2))
.block(FAILSAFE_TIMEOUT)
assertEquals(
"reactor-supplier-mdc",
seenTraceId.get(),
Expand Down Expand Up @@ -234,7 +241,7 @@ class ReactorTest {
// supplier would observe "assembly-time"; with Mono.defer the capture happens per
// subscription, so it must observe the subscriber's value instead.
MDC.put("trace.id", "subscribe-time")
mono.block(Duration.ofSeconds(2))
mono.block(FAILSAFE_TIMEOUT)

assertEquals(
"subscribe-time",
Expand All @@ -261,11 +268,11 @@ class ReactorTest {

MDC.put("trace.id", "first")
val mono = client.executeMono(getRequest())
mono.block(Duration.ofSeconds(2))
mono.block(FAILSAFE_TIMEOUT)

// Re-subscribe under a different MDC; deferred capture must reflect the new value.
MDC.put("trace.id", "second")
mono.block(Duration.ofSeconds(2))
mono.block(FAILSAFE_TIMEOUT)

assertEquals(listOf("first", "second"), seenTraceIds.toList())
} finally {
Expand All @@ -292,7 +299,7 @@ class ReactorTest {
val mono = pipeline.sendMono(getRequest())

MDC.put("trace.id", "subscribe-time")
mono.block(Duration.ofSeconds(2))
mono.block(FAILSAFE_TIMEOUT)

assertEquals(
"subscribe-time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

// Failsafe deadline for awaiting an operation that is expected to complete promptly. It exists only
// to stop a genuinely-stuck test from hanging forever, so it is kept generous: a healthy test
// returns the instant the awaited work finishes and never approaches this bound, even on a loaded
// CI host running modules in parallel.
private const val FAILSAFE_TIMEOUT_SECONDS = 30L

class VirtualThreadsTest {
@Test
fun `asAsyncVirtualThreads runs the call on a virtual thread`() {
Expand All @@ -42,7 +48,7 @@ class VirtualThreadsTest {
}

syncClient.asAsyncVirtualThreads().use { vtClient ->
val response = vtClient.executeAsync(getRequest()).get(2, TimeUnit.SECONDS)
val response = vtClient.executeAsync(getRequest()).get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
assertEquals(200, response.status.code)
}
assertTrue(
Expand All @@ -55,10 +61,13 @@ class VirtualThreadsTest {
fun `close shuts the virtual-thread executor down`() {
val vt = HttpClient { request -> mockResponse(request, 200) }.asAsyncVirtualThreads()
// Drive one request to ensure the executor is live.
vt.executeAsync(getRequest()).get(2, TimeUnit.SECONDS)
vt.executeAsync(getRequest()).get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
vt.close()
// After close, the executor service has terminated; submitting again would throw.
val thrown = runCatching { vt.executeAsync(getRequest()).get(2, TimeUnit.SECONDS) }.exceptionOrNull()
val thrown =
runCatching {
vt.executeAsync(getRequest()).get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
}.exceptionOrNull()
assertTrue(thrown != null, "expected closed virtual-thread executor to reject new tasks")
}

Expand Down Expand Up @@ -99,7 +108,7 @@ class VirtualThreadsTest {
}
syncClient.asAsyncVirtualThreads().use { vt ->
val futures = (1..100).map { vt.executeAsync(getRequest()) }
futures.forEach { it.get(5, TimeUnit.SECONDS) }
futures.forEach { it.get(FAILSAFE_TIMEOUT_SECONDS, TimeUnit.SECONDS) }
}
assertEquals(100, executions.get())
}
Expand Down Expand Up @@ -128,7 +137,7 @@ class VirtualThreadsTest {
mockResponse(request, 200)
}
sync.asAsyncVirtualThreads().use { async ->
async.executeAsync(getRequest()).get(2, java.util.concurrent.TimeUnit.SECONDS)
async.executeAsync(getRequest()).get(FAILSAFE_TIMEOUT_SECONDS, java.util.concurrent.TimeUnit.SECONDS)
}
assertEquals("vt-transport-test", seenTraceId.get())
} finally {
Expand Down
Loading