diff --git a/README.md b/README.md index 6cb6c3a..082cdf6 100644 --- a/README.md +++ b/README.md @@ -201,17 +201,17 @@ graph BT ## Performance -Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026): +Throughput on a single instance (MacBook M3 Max, JDK 25 LTS, May 2026): | Transport | batchSize=10 | batchSize=100 | |-----------|--------------|----------------| -| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s | -| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s | -| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s | +| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,470 msg/s** | **~4,720 msg/s** | +| HTTP @ webhook latency 20 ms (sync sequential — parallel `sendAsync` planned) | ~33 msg/s | ~36 msg/s | +| HTTP @ webhook latency 100 ms (sync sequential — parallel `sendAsync` planned) | ~9 msg/s | ~9 msg/s | -These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. Performance work to lift these limits (async batch delivery, multi-threaded scheduler) is tracked under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14). +Kafka throughput jumped 13-41× over the original sync-sequential baseline thanks to the `deliverBatch` fire-flush-await pattern. HTTP parallel `sendAsync` is next; multi-threaded scheduler scaling is in the roadmap. -Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/). +Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/). ## Build diff --git a/benchmarks/kafka-deliverbatch.json b/benchmarks/kafka-deliverbatch.json new file mode 100644 index 0000000..907ced0 --- /dev/null +++ b/benchmarks/kafka-deliverbatch.json @@ -0,0 +1,157 @@ +[ + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "10" + }, + "primaryMetric" : { + "score" : 0.680696006383041, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.6561445592105263, + "50.0" : 0.680696006383041, + "90.0" : 0.7052474535555555, + "95.0" : 0.7052474535555555, + "99.0" : 0.7052474535555555, + "99.9" : 0.7052474535555555, + "99.99" : 0.7052474535555555, + "99.999" : 0.7052474535555555, + "99.9999" : 0.7052474535555555, + "100.0" : 0.7052474535555555 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.7052474535555555, + 0.6561445592105263 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "50" + }, + "primaryMetric" : { + "score" : 0.26791908345521237, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.2562269965675676, + "50.0" : 0.26791908345521237, + "90.0" : 0.27961117034285715, + "95.0" : 0.27961117034285715, + "99.0" : 0.27961117034285715, + "99.9" : 0.27961117034285715, + "99.99" : 0.27961117034285715, + "99.999" : 0.27961117034285715, + "99.9999" : 0.27961117034285715, + "100.0" : 0.27961117034285715 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.27961117034285715, + 0.2562269965675676 + ] + ] + }, + "secondaryMetrics" : { + } + }, + { + "jmhVersion" : "1.37", + "benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll", + "mode" : "avgt", + "threads" : 1, + "forks" : 1, + "jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java", + "jvmArgs" : [ + ], + "jdkVersion" : "25.0.2", + "vmName" : "OpenJDK 64-Bit Server VM", + "vmVersion" : "25.0.2+10-LTS", + "warmupIterations" : 1, + "warmupTime" : "10 s", + "warmupBatchSize" : 1, + "measurementIterations" : 2, + "measurementTime" : "15 s", + "measurementBatchSize" : 1, + "params" : { + "batchSize" : "100" + }, + "primaryMetric" : { + "score" : 0.21151745586904763, + "scoreError" : "NaN", + "scoreConfidence" : [ + "NaN", + "NaN" + ], + "scorePercentiles" : { + "0.0" : 0.21086217661904763, + "50.0" : 0.21151745586904763, + "90.0" : 0.2121727351190476, + "95.0" : 0.2121727351190476, + "99.0" : 0.2121727351190476, + "99.9" : 0.2121727351190476, + "99.99" : 0.2121727351190476, + "99.999" : 0.2121727351190476, + "99.9999" : 0.2121727351190476, + "100.0" : 0.2121727351190476 + }, + "scoreUnit" : "ms/op", + "rawData" : [ + [ + 0.2121727351190476, + 0.21086217661904763 + ] + ] + }, + "secondaryMetrics" : { + } + } +] + + diff --git a/benchmarks/results-kafka-deliverbatch.md b/benchmarks/results-kafka-deliverbatch.md new file mode 100644 index 0000000..4c28ad1 --- /dev/null +++ b/benchmarks/results-kafka-deliverbatch.md @@ -0,0 +1,73 @@ +# Kafka deliverBatch fire-flush-await — Results (KOJAK-73) + +Measured 2026-05-04 on the same hardware as the April 2026 baseline (MacBook M3 Max, +JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, smoke-run JMH config: +`fork=1, warmup=1, iter=2, warmup=10s, measurement=15s`). + +> ⚠️ **Statistical caveat:** numbers below come from a smoke-run config (`n=2` samples; +> `scoreError` in the raw JSON is `NaN`). The order-of-magnitude claim (13–41×) is +> physically credible (sequential `N×RTT` → `1×RTT`) but the precise multipliers are +> not statistically defensible until a full-config rerun (`fork=2, warmup=3, iter=5`). + +## Headline numbers — Kafka throughput + +| batchSize | Baseline (ms/op) | Post-optimization (ms/op) | **Improvement** | +|-----------|------------------|---------------------------|-----------------| +| 10 | 9.168 | 0.681 | **13.5×** | +| 50 | 8.665 | 0.268 | **32.3×** | +| 100 | 8.701 | 0.212 | **41.0×** | + +Translated to msg/s: + +| batchSize | Baseline | Post-optimization | Improvement | +|-----------|----------|-------------------|-------------| +| 10 | ~109 | **~1,468** | 13.5× | +| 50 | ~115 | **~3,731** | 32.3× | +| 100 | ~115 | **~4,717** | 41.0× | + +Raw JSON: [`kafka-deliverbatch.json`](kafka-deliverbatch.json). + +## What changed + +`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await: +1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer) +2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`) +3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()` + +Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`. + +## Reading the table + +- **`batchSize` is now load-bearing.** Pre-optimization throughput was flat across `batchSize` + values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O. + Post-optimization throughput scales with `batchSize` (1,468 → 3,731 → 4,717), proving that + Kafka's internal record batching is now being exploited. +- **Sublinear scaling 50 → 100** (32× → 41× vs expected ~2× more). Indicates that DB UPDATE + overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly + what motivates the batch UPDATE optimization via JDBC `executeBatch` (KOJAK-75) — at small + batch sizes the per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed, + the N individual UPDATE statements become the next bottleneck to attack. +- **batchSize=10 lowest gain (13.5×)** — at that batch size only 10 records can amortize + one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is + proportionally larger. + +## Verification context + +- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering, + single flush call (verified via flush counter), synchronous send exception (Permanent + + Retriable variants), and future-based async exception (driven via `MockProducer` override + that completes/errors per-position inside flush). +- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka. +- ktlint clean, configuration cache reuses across modules. + +## What's next + +1. **HTTP `deliverBatch`** (KOJAK-74) — analogous fire-all-await for HTTP via parallel + `httpClient.sendAsync`. Expected impact at realistic webhook latency + (`httpLatencyMs ∈ {20, 100}`): from ~33 / ~9 msg/s baseline to **~500-2,000 msg/s** range, + depending on host/connection pool reuse. +2. **Batch UPDATE via JDBC `executeBatch`** (KOJAK-75). Now load-bearing: at `batchSize=100` + the N individual UPDATE statements have become the dominant per-batch cost. Expected + to shift `batchSize=100` Kafka throughput from ~4,700 toward the ~10,000 msg/s range. +3. **Concurrent processor fan-out** (KOJAK-77) — multi-threaded scheduler. Multiplies all + of the above by N workers. diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt index 39eb076..1ba4ef6 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt @@ -26,7 +26,7 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel * Entries whose type has no registered deliverer are mapped to * [DeliveryResult.PermanentFailure] (consistent with [deliver]). */ - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { if (entries.isEmpty()) return emptyList() val resultByEntry: Map = entries @@ -36,11 +36,13 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel if (deliverer != null) { deliverer.deliverBatch(group) } else { - group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") } + group.map { DeliveryOutcome(it, DeliveryResult.PermanentFailure("No deliverer registered for type '$type'")) } } } - .toMap() + .associate { it.entry to it.result } - return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) } + return entries.map { entry -> + DeliveryOutcome(entry, resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) + } } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt new file mode 100644 index 0000000..312a48f --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt @@ -0,0 +1,12 @@ +package com.softwaremill.okapi.core + +/** + * Per-entry result of one [MessageDeliverer.deliverBatch] invocation. + * + * Transient transport-layer report — consumed by [OutboxEntryProcessor] + * in the same batch cycle and never persisted. + */ +data class DeliveryOutcome( + val entry: OutboxEntry, + val result: DeliveryResult, +) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt index 38f1bfe..43ea43e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt @@ -9,6 +9,12 @@ package com.softwaremill.okapi.core interface MessageDeliverer { val type: String + /** + * Delivers a single entry. MUST NOT throw — transport-level errors surface + * as [DeliveryResult.RetriableFailure] (transient: network, timeout, interrupt) + * or [DeliveryResult.PermanentFailure] (won't fix itself: corrupt metadata, + * missing service, auth, payload too large). + */ fun deliver(entry: OutboxEntry): DeliveryResult /** @@ -20,8 +26,12 @@ interface MessageDeliverer { * be overlapped (e.g. Kafka's internal record batching, parallel HTTP * `sendAsync`) should override this method to exploit that. * - * Per-entry result classification (Success / RetriableFailure / PermanentFailure) - * is preserved — callers receive one [DeliveryResult] per input entry. + * Implementations MUST NOT abort the batch on individual entry failure; + * the returned list always has the same size as [entries], in input order, + * with each entry independently classified as Success / RetriableFailure / + * PermanentFailure. This method MUST NOT throw — transport-level errors + * surface as [DeliveryResult.RetriableFailure] or [DeliveryResult.PermanentFailure] + * on the affected entries. */ - fun deliverBatch(entries: List): List> = entries.map { entry -> entry to deliver(entry) } + fun deliverBatch(entries: List): List = entries.map { entry -> DeliveryOutcome(entry, deliver(entry)) } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt index 4716327..81eb719 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt @@ -37,11 +37,11 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 4 - results.map { it.first } shouldBe entries - results[0].second shouldBe DeliveryResult.Success - results[1].second shouldBe DeliveryResult.RetriableFailure("503") - results[2].second shouldBe DeliveryResult.Success - results[3].second shouldBe DeliveryResult.RetriableFailure("503") + results.map { it.entry } shouldBe entries + results[0].result shouldBe DeliveryResult.Success + results[1].result shouldBe DeliveryResult.RetriableFailure("503") + results[2].result shouldBe DeliveryResult.Success + results[3].result shouldBe DeliveryResult.RetriableFailure("503") } test("deliverBatch fails permanently for entries with no registered deliverer") { @@ -56,9 +56,9 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 2 - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing" + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "missing" } test("deliverBatch with empty input returns empty list") { @@ -72,17 +72,17 @@ class CompositeMessageDelivererTest : FunSpec({ val kafkaDeliverer = object : MessageDeliverer { override val type = "kafka" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsKafka++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val httpDeliverer = object : MessageDeliverer { override val type = "http" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsHttp++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer)) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt index 201900c..27c3753 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt @@ -29,12 +29,12 @@ class MessageDelivererTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 3 - results[0].first shouldBe entries[0] - results[0].second shouldBe DeliveryResult.Success - results[1].first shouldBe entries[1] - results[1].second shouldBe DeliveryResult.RetriableFailure("err1") - results[2].first shouldBe entries[2] - results[2].second shouldBe DeliveryResult.PermanentFailure("err2") + results[0].entry shouldBe entries[0] + results[0].result shouldBe DeliveryResult.Success + results[1].entry shouldBe entries[1] + results[1].result shouldBe DeliveryResult.RetriableFailure("err1") + results[2].entry shouldBe entries[2] + results[2].result shouldBe DeliveryResult.PermanentFailure("err2") } test("default deliverBatch on empty input returns empty list without calling deliver") { diff --git a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt index d591cf9..7fd2dc5 100644 --- a/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt +++ b/okapi-integration-tests/src/test/kotlin/com/softwaremill/okapi/test/transport/KafkaTransportIntegrationTest.kt @@ -17,17 +17,17 @@ import java.util.UUID class KafkaTransportIntegrationTest : FunSpec({ val kafka = KafkaTestSupport() - var producer: KafkaProducer? = null + lateinit var producer: KafkaProducer lateinit var deliverer: KafkaMessageDeliverer beforeSpec { kafka.start() producer = kafka.createProducer() - deliverer = KafkaMessageDeliverer(producer!!) + deliverer = KafkaMessageDeliverer(producer) } afterSpec { - producer?.close() + producer.close() kafka.stop() } @@ -121,4 +121,35 @@ class KafkaTransportIntegrationTest : FunSpec({ result shouldBe DeliveryResult.Success } + + test("deliverBatch sends all entries to topic and returns Success in input order") { + val topic = "batch-topic-${UUID.randomUUID()}" + val entries = (0 until 25).map { i -> + entryWithInfo(topic = topic, payload = """{"seq":$i}""") + } + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe entries.size + results.forEachIndexed { i, (entry, result) -> + entry.outboxId shouldBe entries[i].outboxId + result shouldBe DeliveryResult.Success + } + + val consumer = kafka.createConsumer(groupId = "test-batch-${UUID.randomUUID()}") + consumer.subscribe(listOf(topic)) + val received = mutableListOf() + val deadline = Instant.now().plusSeconds(15) + while (received.size < entries.size && Instant.now().isBefore(deadline)) { + consumer.poll(Duration.ofSeconds(2)).forEach { received.add(it.value()) } + } + consumer.close() + + received.size shouldBe entries.size + } + + test("deliverBatch on empty input returns empty list without contacting broker") { + val results = deliverer.deliverBatch(emptyList()) + results.size shouldBe 0 + } }) diff --git a/okapi-kafka/build.gradle.kts b/okapi-kafka/build.gradle.kts index 3318514..9c035ad 100644 --- a/okapi-kafka/build.gradle.kts +++ b/okapi-kafka/build.gradle.kts @@ -9,6 +9,7 @@ dependencies { implementation(project(":okapi-core")) implementation(libs.jacksonModuleKotlin) implementation(libs.jacksonDatatypeJsr310) + implementation(libs.slf4jApi) compileOnly(libs.kafkaClients) testImplementation(libs.kafkaClients) diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 31b99b2..52eaaf9 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -1,45 +1,118 @@ package com.softwaremill.okapi.kafka +import com.softwaremill.okapi.core.DeliveryOutcome import com.softwaremill.okapi.core.DeliveryResult import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.RetriableException +import org.slf4j.LoggerFactory import java.util.concurrent.ExecutionException +import java.util.concurrent.Future /** * [MessageDeliverer] that publishes outbox entries to Kafka topics. * - * Uses the provided [Producer] to send records synchronously. - * Kafka [RetriableException]s map to [DeliveryResult.RetriableFailure]; - * all other errors map to [DeliveryResult.PermanentFailure]. + * Exception classification: + * - Kafka [RetriableException] → [DeliveryResult.RetriableFailure] + * - Thread interrupts ([InterruptException], [InterruptedException]) → + * [DeliveryResult.RetriableFailure] (interrupt flag restored) + * - all other errors → [DeliveryResult.PermanentFailure] */ class KafkaMessageDeliverer( private val producer: Producer, ) : MessageDeliverer { override val type: String = KafkaDeliveryInfo.TYPE - override fun deliver(entry: OutboxEntry): DeliveryResult { - val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) - val record = - ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { - info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } - } + override fun deliver(entry: OutboxEntry): DeliveryResult = try { + producer.send(buildRecord(entry)).get() + DeliveryResult.Success + } catch (e: ExecutionException) { + classifyExecutionException(e) + } catch (e: Exception) { + classifyException(e) + } + + /** + * Uses fire-flush-await: send all entries, then a single `flush()` (which + * bypasses `linger.ms`), then collect outcomes via non-blocking `Future.get()` + * since completion is already settled. A failing `send()` does not abort the + * batch; the result list mirrors input order. + * + * If `flush()` itself fails (interrupt, fatal producer state), per-entry + * futures still surface their own exception via `get()` and are classified + * individually — the batch as a whole is never abandoned. + */ + override fun deliverBatch(entries: List): List { + if (entries.isEmpty()) return emptyList() + + val inflight: List> = entries.map { entry -> + entry to fireOne(entry) + } + + try { + producer.flush() + } catch (e: InterruptException) { + Thread.currentThread().interrupt() + logger.warn("Kafka producer.flush() interrupted; per-entry futures will surface the cause", e) + } catch (e: Exception) { + logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e) + } + + return inflight.map { (entry, outcome) -> DeliveryOutcome(entry, awaitOne(outcome)) } + } + + private fun fireOne(entry: OutboxEntry): SendOutcome = try { + SendOutcome.Sent(producer.send(buildRecord(entry))) + } catch (e: Exception) { + val classified = classifyException(e) + logger.debug("Kafka send rejected synchronously for entry {}", entry.outboxId, e) + SendOutcome.ImmediateFailure(classified) + } - return try { - producer.send(record).get() + private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) { + is SendOutcome.ImmediateFailure -> outcome.result + is SendOutcome.Sent -> try { + outcome.future.get() DeliveryResult.Success } catch (e: ExecutionException) { - classifyException(e.cause ?: e) + classifyExecutionException(e) } catch (e: Exception) { classifyException(e) } } - private fun classifyException(e: Throwable): DeliveryResult = if (e is RetriableException) { - DeliveryResult.RetriableFailure(e.message ?: "Retriable Kafka error") - } else { - DeliveryResult.PermanentFailure(e.message ?: "Permanent Kafka error") + private fun buildRecord(entry: OutboxEntry): ProducerRecord { + val info = KafkaDeliveryInfo.deserialize(entry.deliveryMetadata) + return ProducerRecord(info.topic, info.partitionKey, entry.payload).apply { + info.headers.forEach { (k, v) -> headers().add(k, v.toByteArray()) } + } + } + + private fun classifyException(e: Throwable): DeliveryResult { + val message = e.message ?: e.javaClass.simpleName + return when { + e is InterruptException || e is InterruptedException -> { + Thread.currentThread().interrupt() + DeliveryResult.RetriableFailure(message) + } + e is RetriableException -> DeliveryResult.RetriableFailure(message) + else -> DeliveryResult.PermanentFailure(message) + } + } + + private fun classifyExecutionException(e: ExecutionException): DeliveryResult = e.cause?.let { classifyException(it) } + ?: DeliveryResult.RetriableFailure("ExecutionException with no cause") + + private sealed interface SendOutcome { + data class Sent(val future: Future) : SendOutcome + data class ImmediateFailure(val result: DeliveryResult) : SendOutcome + } + + companion object { + private val logger = LoggerFactory.getLogger(KafkaMessageDeliverer::class.java) } } diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt new file mode 100644 index 0000000..18a9d00 --- /dev/null +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -0,0 +1,205 @@ +package com.softwaremill.okapi.kafka + +import com.softwaremill.okapi.core.DeliveryOutcome +import com.softwaremill.okapi.core.DeliveryResult +import com.softwaremill.okapi.core.OutboxEntry +import com.softwaremill.okapi.core.OutboxMessage +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.apache.kafka.clients.producer.MockProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException +import org.apache.kafka.common.errors.NetworkException +import org.apache.kafka.common.serialization.StringSerializer +import java.time.Instant +import java.util.concurrent.Future + +private fun entry(suffix: String, metadataOverride: String? = null): OutboxEntry { + val info = kafkaDeliveryInfo { topic = "topic-$suffix" } + val baseEntry = OutboxEntry.createPending(OutboxMessage("evt-$suffix", """{"k":"v-$suffix"}"""), info, Instant.now()) + return if (metadataOverride != null) baseEntry.copy(deliveryMetadata = metadataOverride) else baseEntry +} + +class KafkaMessageDelivererBatchTest : FunSpec({ + test("deliverBatch on empty input returns empty list and does not invoke producer") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + + deliverer.deliverBatch(emptyList()) shouldBe emptyList() + producer.history().size shouldBe 0 + } + + test("deliverBatch with all-success preserves input order and reports all entries delivered") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + results.map { it.entry } shouldBe entries + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + producer.history().size shouldBe 3 + } + + test("deliverBatch fires all sends before flushing — single flush call") { + // MockProducer with autoComplete=false: futures stay pending until completeNext/errorNext, + // so flush() is the only way settlement can happen — verifies the fire-flush-await sequence. + var flushCount = 0 + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + flushCount++ + while (completeNext()) Unit + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + flushCount shouldBe 1 + results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } + } + + test("deliverBatch maps synchronous PermanentFailure for all entries when sendException is global") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = AuthenticationException("bad creds") + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results.forEach { (_, r) -> + r.shouldBeInstanceOf() + (r as DeliveryResult.PermanentFailure).error shouldContain "bad creds" + } + } + + test("deliverBatch maps synchronous RetriableFailure when send throws RetriableException") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = NetworkException("broker temporarily unreachable") + val deliverer = KafkaMessageDeliverer(producer) + + val results = deliverer.deliverBatch(listOf(entry("a"))) + + results[0].result.shouldBeInstanceOf() + } + + test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { + // Drive mixed outcomes from inside flush(): entry 0 completes OK, entry 1 errors. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("transient")) + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.RetriableFailure).error shouldContain "transient" + } + + test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") { + // Throw synchronously on the 2nd send only; first goes async-success, third goes async-fail. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + private var sendCount = 0 + + override fun send(record: ProducerRecord): Future { + sendCount++ + if (sendCount == 2) throw AuthenticationException("forbidden on send #$sendCount") + return super.send(record) + } + + override fun flush() { + completeNext() // entry 0 -> Success + errorNext(NetworkException("async fail")) // entry 2 -> Retriable + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b"), entry("c")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 3 + // Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant + results.map { it.entry } shouldBe entries + + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "forbidden" + results[2].result.shouldBeInstanceOf() + (results[2].result as DeliveryResult.RetriableFailure).error shouldContain "async fail" + } + + test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + val deliverer = KafkaMessageDeliverer(producer) + val good1 = entry("good1") + val poisoned = entry("bad", metadataOverride = "{not valid kafka info json}") + val good2 = entry("good2") + + val results = deliverer.deliverBatch(listOf(good1, poisoned, good2)) + + results.size shouldBe 3 + results.map { it.entry } shouldBe listOf(good1, poisoned, good2) + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + results[2].result shouldBe DeliveryResult.Success + // Only the good entries actually reached the producer + producer.history().size shouldBe 2 + } + + test("deliverBatch survives flush throwing non-Interrupt exception by classifying per-entry futures") { + // Flush blows up; each per-entry future has been settled by completeNext/errorNext just before. + // Contract: deliverBatch never re-throws — it always returns one DeliveryResult per input entry. + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + completeNext() + errorNext(NetworkException("via future")) + throw IllegalStateException("producer fatally borked") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a"), entry("b")) + + val results = deliverer.deliverBatch(entries) + + results.size shouldBe 2 + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + } + + test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") { + // flush() throws Kafka's InterruptException without settling the futures. Our awaitOne + // then encounters Future.get() on an interrupted thread; for incomplete futures this + // raises InterruptedException which we explicitly classify as RetriableFailure (so the + // outbox reschedules instead of marking PermanentFailure). + val producer = object : MockProducer(false, null, StringSerializer(), StringSerializer()) { + override fun flush() { + throw InterruptException("interrupted") + } + } + val deliverer = KafkaMessageDeliverer(producer) + val entries = listOf(entry("a")) + + val results: List + try { + results = deliverer.deliverBatch(entries) + } finally { + // Drain the interrupt status so it doesn't leak to the next test (Thread.interrupted clears it). + Thread.interrupted() + } + + results.size shouldBe 1 + results[0].result.shouldBeInstanceOf() + } +}) diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt index 31f4cc4..26a3d6a 100644 --- a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererTest.kt @@ -8,6 +8,7 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf import org.apache.kafka.clients.producer.MockProducer import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.NetworkException import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.serialization.StringSerializer @@ -45,4 +46,33 @@ class KafkaMessageDelivererTest : FunSpec({ val deliverer = KafkaMessageDeliverer(producer) deliverer.deliver(entry()).shouldBeInstanceOf() } + + test("Kafka InterruptException on send → RetriableFailure (interrupt flag restored)") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = InterruptException("interrupted") + val deliverer = KafkaMessageDeliverer(producer) + try { + deliverer.deliver(entry()).shouldBeInstanceOf() + Thread.currentThread().isInterrupted shouldBe true + } finally { + // Clear the interrupt flag so it doesn't leak to subsequent tests. + Thread.interrupted() + } + } + + test("Kafka InterruptException on send in deliverBatch → RetriableFailure per entry") { + val producer = MockProducer(true, null, StringSerializer(), StringSerializer()) + producer.sendException = InterruptException("interrupted") + val deliverer = KafkaMessageDeliverer(producer) + try { + val results = deliverer.deliverBatch(listOf(entry(), entry())) + results.size shouldBe 2 + results.forEach { (_, result) -> + result.shouldBeInstanceOf() + } + Thread.currentThread().isInterrupted shouldBe true + } finally { + Thread.interrupted() + } + } })