Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,17 @@ graph BT

## Performance

Throughput baseline (single instance, sync sequential delivery, MacBook M3 Max, JDK 25 LTS, April 2026):
Throughput on a single instance (MacBook M3 Max, JDK 25 LTS, May 2026):

| Transport | batchSize=10 | batchSize=100 |
|-----------|--------------|----------------|
| Kafka (`acks=all`, localhost broker) | ~110 msg/s | ~115 msg/s |
| HTTP @ webhook latency 20 ms | ~33 msg/s | ~36 msg/s |
| HTTP @ webhook latency 100 ms | ~9 msg/s | ~9 msg/s |
| Kafka (`acks=all`, localhost broker, async batch via `deliverBatch`) | **~1,470 msg/s** | **~4,720 msg/s** |
| HTTP @ webhook latency 20 ms (sync sequential — parallel `sendAsync` planned) | ~33 msg/s | ~36 msg/s |
| HTTP @ webhook latency 100 ms (sync sequential — parallel `sendAsync` planned) | ~9 msg/s | ~9 msg/s |

These numbers reflect the current sync-sequential delivery model. Throughput is bounded by per-message round-trip time × batch size. Performance work to lift these limits (async batch delivery, multi-threaded scheduler) is tracked under the [KOJAK-14 epic](https://softwaremill.atlassian.net/browse/KOJAK-14).
Kafka throughput jumped 13-41× over the original sync-sequential baseline thanks to the `deliverBatch` fire-flush-await pattern. HTTP parallel `sendAsync` is next; multi-threaded scheduler scaling is in the roadmap.

Full methodology, raw JMH results, and reproduction instructions: [`benchmarks/`](benchmarks/).
Full methodology, raw JMH results, before/after per change: [`benchmarks/`](benchmarks/).

## Build

Expand Down
157 changes: 157 additions & 0 deletions benchmarks/kafka-deliverbatch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
[
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "10"
},
"primaryMetric" : {
"score" : 0.680696006383041,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.6561445592105263,
"50.0" : 0.680696006383041,
"90.0" : 0.7052474535555555,
"95.0" : 0.7052474535555555,
"99.0" : 0.7052474535555555,
"99.9" : 0.7052474535555555,
"99.99" : 0.7052474535555555,
"99.999" : 0.7052474535555555,
"99.9999" : 0.7052474535555555,
"100.0" : 0.7052474535555555
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.7052474535555555,
0.6561445592105263
]
]
},
"secondaryMetrics" : {
}
},
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "50"
},
"primaryMetric" : {
"score" : 0.26791908345521237,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.2562269965675676,
"50.0" : 0.26791908345521237,
"90.0" : 0.27961117034285715,
"95.0" : 0.27961117034285715,
"99.0" : 0.27961117034285715,
"99.9" : 0.27961117034285715,
"99.99" : 0.27961117034285715,
"99.999" : 0.27961117034285715,
"99.9999" : 0.27961117034285715,
"100.0" : 0.27961117034285715
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.27961117034285715,
0.2562269965675676
]
]
},
"secondaryMetrics" : {
}
},
{
"jmhVersion" : "1.37",
"benchmark" : "com.softwaremill.okapi.benchmarks.KafkaThroughputBenchmark.drainAll",
"mode" : "avgt",
"threads" : 1,
"forks" : 1,
"jvm" : "/Users/andrzej.kobylinski/.sdkman/candidates/java/25.0.2-tem/bin/java",
"jvmArgs" : [
],
"jdkVersion" : "25.0.2",
"vmName" : "OpenJDK 64-Bit Server VM",
"vmVersion" : "25.0.2+10-LTS",
"warmupIterations" : 1,
"warmupTime" : "10 s",
"warmupBatchSize" : 1,
"measurementIterations" : 2,
"measurementTime" : "15 s",
"measurementBatchSize" : 1,
"params" : {
"batchSize" : "100"
},
"primaryMetric" : {
"score" : 0.21151745586904763,
"scoreError" : "NaN",
"scoreConfidence" : [
"NaN",
"NaN"
],
"scorePercentiles" : {
"0.0" : 0.21086217661904763,
"50.0" : 0.21151745586904763,
"90.0" : 0.2121727351190476,
"95.0" : 0.2121727351190476,
"99.0" : 0.2121727351190476,
"99.9" : 0.2121727351190476,
"99.99" : 0.2121727351190476,
"99.999" : 0.2121727351190476,
"99.9999" : 0.2121727351190476,
"100.0" : 0.2121727351190476
},
"scoreUnit" : "ms/op",
"rawData" : [
[
0.2121727351190476,
0.21086217661904763
]
]
},
"secondaryMetrics" : {
}
}
]


73 changes: 73 additions & 0 deletions benchmarks/results-kafka-deliverbatch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Kafka deliverBatch fire-flush-await — Results (KOJAK-73)

Measured 2026-05-04 on the same hardware as the April 2026 baseline (MacBook M3 Max,
JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, smoke-run JMH config:
`fork=1, warmup=1, iter=2, warmup=10s, measurement=15s`).

> ⚠️ **Statistical caveat:** numbers below come from a smoke-run config (`n=2` samples;
> `scoreError` in the raw JSON is `NaN`). The order-of-magnitude claim (13–41×) is
> physically credible (sequential `N×RTT` → `1×RTT`) but the precise multipliers are
> not statistically defensible until a full-config rerun (`fork=2, warmup=3, iter=5`).

## Headline numbers — Kafka throughput

| batchSize | Baseline (ms/op) | Post-optimization (ms/op) | **Improvement** |
|-----------|------------------|---------------------------|-----------------|
| 10 | 9.168 | 0.681 | **13.5×** |
| 50 | 8.665 | 0.268 | **32.3×** |
| 100 | 8.701 | 0.212 | **41.0×** |

Translated to msg/s:

| batchSize | Baseline | Post-optimization | Improvement |
|-----------|----------|-------------------|-------------|
| 10 | ~109 | **~1,468** | 13.5× |
| 50 | ~115 | **~3,731** | 32.3× |
| 100 | ~115 | **~4,717** | 41.0× |

Raw JSON: [`kafka-deliverbatch.json`](kafka-deliverbatch.json).

## What changed

`KafkaMessageDeliverer.deliverBatch` now uses fire-flush-await:
1. **Fire** — call `producer.send()` for every entry (non-blocking; records go to producer's internal buffer)
2. **Flush** — single `producer.flush()` call drives all queued records to the broker in one batched network round-trip (bypasses `linger.ms`)
3. **Await** — `Future.get()` per entry returns immediately because completion is settled by `flush()`

Previously, each entry incurred a full `producer.send().get()` round-trip sequentially. With ~9 ms localhost Kafka RTT (`acks=all`), 1000 entries × 9 ms = ~9 s regardless of `batchSize`.

## Reading the table

- **`batchSize` is now load-bearing.** Pre-optimization throughput was flat across `batchSize`
values (109 → 115 → 115 msg/s) — confirming the bottleneck was per-record blocking I/O.
Post-optimization throughput scales with `batchSize` (1,468 → 3,731 → 4,717), proving that
Kafka's internal record batching is now being exploited.
- **Sublinear scaling 50 → 100** (32× → 41× vs expected ~2× more). Indicates that DB UPDATE
overhead per entry is now significant relative to the (now-fast) Kafka path. This is exactly
what motivates the batch UPDATE optimization via JDBC `executeBatch` (KOJAK-75) — at small
batch sizes the per-message DB cost was hidden by 9 ms Kafka RTT; with Kafka latency removed,
the N individual UPDATE statements become the next bottleneck to attack.
- **batchSize=10 lowest gain (13.5×)** — at that batch size only 10 records can amortize
one RTT, so the per-batch overhead (claimPending, transaction begin/commit, 10 UPDATEs) is
proportionally larger.

## Verification context

- Unit tests: `KafkaMessageDelivererBatchTest` covers empty input, all-success ordering,
single flush call (verified via flush counter), synchronous send exception (Permanent +
Retriable variants), and future-based async exception (driven via `MockProducer` override
that completes/errors per-position inside flush).
- Integration tests in `okapi-integration-tests` continue to pass with real Postgres + Kafka.
- ktlint clean, configuration cache reuses across modules.

## What's next

1. **HTTP `deliverBatch`** (KOJAK-74) — analogous fire-all-await for HTTP via parallel
`httpClient.sendAsync`. Expected impact at realistic webhook latency
(`httpLatencyMs ∈ {20, 100}`): from ~33 / ~9 msg/s baseline to **~500-2,000 msg/s** range,
depending on host/connection pool reuse.
2. **Batch UPDATE via JDBC `executeBatch`** (KOJAK-75). Now load-bearing: at `batchSize=100`
the N individual UPDATE statements have become the dominant per-batch cost. Expected
to shift `batchSize=100` Kafka throughput from ~4,700 toward the ~10,000 msg/s range.
3. **Concurrent processor fan-out** (KOJAK-77) — multi-threaded scheduler. Multiplies all
of the above by N workers.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CompositeMessageDeliverer(deliverers: List<MessageDeliverer>) : MessageDel
* Entries whose type has no registered deliverer are mapped to
* [DeliveryResult.PermanentFailure] (consistent with [deliver]).
*/
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
if (entries.isEmpty()) return emptyList()

val resultByEntry: Map<OutboxEntry, DeliveryResult> = entries
Expand All @@ -36,11 +36,13 @@ class CompositeMessageDeliverer(deliverers: List<MessageDeliverer>) : MessageDel
if (deliverer != null) {
deliverer.deliverBatch(group)
} else {
group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") }
group.map { DeliveryOutcome(it, DeliveryResult.PermanentFailure("No deliverer registered for type '$type'")) }
}
}
.toMap()
.associate { it.entry to it.result }

return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) }
return entries.map { entry ->
DeliveryOutcome(entry, resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.softwaremill.okapi.core

/**
* Per-entry result of one [MessageDeliverer.deliverBatch] invocation.
*
* Transient transport-layer report — consumed by [OutboxEntryProcessor]
* in the same batch cycle and never persisted.
*/
data class DeliveryOutcome(
val entry: OutboxEntry,
val result: DeliveryResult,
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ package com.softwaremill.okapi.core
interface MessageDeliverer {
val type: String

/**
* Delivers a single entry. MUST NOT throw — transport-level errors surface
* as [DeliveryResult.RetriableFailure] (transient: network, timeout, interrupt)
* or [DeliveryResult.PermanentFailure] (won't fix itself: corrupt metadata,
* missing service, auth, payload too large).
*/
fun deliver(entry: OutboxEntry): DeliveryResult

/**
Expand All @@ -20,8 +26,12 @@ interface MessageDeliverer {
* be overlapped (e.g. Kafka's internal record batching, parallel HTTP
* `sendAsync`) should override this method to exploit that.
*
* Per-entry result classification (Success / RetriableFailure / PermanentFailure)
* is preserved — callers receive one [DeliveryResult] per input entry.
* Implementations MUST NOT abort the batch on individual entry failure;
* the returned list always has the same size as [entries], in input order,
* with each entry independently classified as Success / RetriableFailure /
* PermanentFailure. This method MUST NOT throw — transport-level errors
* surface as [DeliveryResult.RetriableFailure] or [DeliveryResult.PermanentFailure]
* on the affected entries.
*/
fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> = entries.map { entry -> entry to deliver(entry) }
fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> = entries.map { entry -> DeliveryOutcome(entry, deliver(entry)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class CompositeMessageDelivererTest : FunSpec({
val results = composite.deliverBatch(entries)

results.size shouldBe 4
results.map { it.first } shouldBe entries
results[0].second shouldBe DeliveryResult.Success
results[1].second shouldBe DeliveryResult.RetriableFailure("503")
results[2].second shouldBe DeliveryResult.Success
results[3].second shouldBe DeliveryResult.RetriableFailure("503")
results.map { it.entry } shouldBe entries
results[0].result shouldBe DeliveryResult.Success
results[1].result shouldBe DeliveryResult.RetriableFailure("503")
results[2].result shouldBe DeliveryResult.Success
results[3].result shouldBe DeliveryResult.RetriableFailure("503")
}

test("deliverBatch fails permanently for entries with no registered deliverer") {
Expand All @@ -56,9 +56,9 @@ class CompositeMessageDelivererTest : FunSpec({
val results = composite.deliverBatch(entries)

results.size shouldBe 2
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing"
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].result as DeliveryResult.PermanentFailure).error shouldContain "missing"
}

test("deliverBatch with empty input returns empty list") {
Expand All @@ -72,17 +72,17 @@ class CompositeMessageDelivererTest : FunSpec({
val kafkaDeliverer = object : MessageDeliverer {
override val type = "kafka"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
batchCallsKafka++
return entries.map { it to DeliveryResult.Success }
return entries.map { DeliveryOutcome(it, DeliveryResult.Success) }
}
}
val httpDeliverer = object : MessageDeliverer {
override val type = "http"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
batchCallsHttp++
return entries.map { it to DeliveryResult.Success }
return entries.map { DeliveryOutcome(it, DeliveryResult.Success) }
}
}
val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class MessageDelivererTest : FunSpec({
val results = deliverer.deliverBatch(entries)

results.size shouldBe 3
results[0].first shouldBe entries[0]
results[0].second shouldBe DeliveryResult.Success
results[1].first shouldBe entries[1]
results[1].second shouldBe DeliveryResult.RetriableFailure("err1")
results[2].first shouldBe entries[2]
results[2].second shouldBe DeliveryResult.PermanentFailure("err2")
results[0].entry shouldBe entries[0]
results[0].result shouldBe DeliveryResult.Success
results[1].entry shouldBe entries[1]
results[1].result shouldBe DeliveryResult.RetriableFailure("err1")
results[2].entry shouldBe entries[2]
results[2].result shouldBe DeliveryResult.PermanentFailure("err2")
}

test("default deliverBatch on empty input returns empty list without calling deliver") {
Expand Down
Loading