Skip to content

feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40

Open
endrju19 wants to merge 4 commits into
mainfrom
feature/kojak-73-kafka-deliver-batch
Open

feat: Kafka deliverBatch fire-flush-await — 13-41x throughput (KOJAK-73)#40
endrju19 wants to merge 4 commits into
mainfrom
feature/kojak-73-kafka-deliver-batch

Conversation

@endrju19
Copy link
Copy Markdown
Collaborator

@endrju19 endrju19 commented May 4, 2026

Summary

Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern. Replaces N sequential producer.send().get() round-trips with one batched network round-trip via producer.flush().

KOJAK-73 — Kafka deliverBatch
Builds on KOJAK-72 (deliverBatch interface).

Headline numbers

Smoke benchmark vs KOJAK-68 baseline (same hardware: MacBook M3 Max, JDK 25 LTS, Postgres 16 + Kafka 3.8.1 via Testcontainers, fork=1, warmup=1, iter=2, warmup=10s, measurement=15s):

batchSize Baseline This PR Improvement
10 ~109 msg/s ~1,468 msg/s 13.5×
50 ~115 msg/s ~3,731 msg/s 32.3×
100 ~115 msg/s ~4,717 msg/s 41.0×

batchSize is now load-bearing. Pre-KOJAK-73 throughput was flat across batchSize values — the bottleneck was per-record blocking producer.send().get(). Post-KOJAK-73 throughput scales with batchSize, proving Kafka's internal record batching is being exploited.

Full results + interpretation: benchmarks/results-postopt-kojak-73.md.

Implementation

override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
    if (entries.isEmpty()) return emptyList()

    // Phase 1: fire all (non-blocking, records queue in producer buffer)
    val inflight = entries.map { it to fireOne(it) }

    // Phase 2: flush — bypasses linger.ms, one batched RTT for all in-flight
    try { producer.flush() } catch (e: InterruptException) {
        Thread.currentThread().interrupt()
    }

    // Phase 3: collect — Future.get() is non-blocking after flush
    return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) }
}

SendOutcome sealed type distinguishes synchronous send exceptions (e.g., BufferExhaustedException, SerializationException) from in-flight futures — one failing send never aborts the batch.

deliver() refactored to share buildRecord / classifyException helpers with deliverBatch — no behavior change for single-entry path.

Test coverage

New unit tests in KafkaMessageDelivererBatchTest:

  • Empty input → empty output, producer untouched
  • All-success preserves input order
  • Single flush() call (verified via flush counter override)
  • Synchronous send exception → per-entry classification (Permanent + Retriable variants)
  • Future-based async exception (via MockProducer override that drives completeNext/errorNext from inside flush)

Integration tests in okapi-integration-tests continue to pass with real Kafka.

Sublinear scaling: KOJAK-75 motivation

Going from batchSize=50 → 100 gives 32× → 41× (only ~26% more, not 2×). At larger batches, the N individual updateAfterProcessing calls become significant relative to the now-fast Kafka path. This is exactly what KOJAK-75 (batch UPDATE via executeBatch) addresses — it'll be the next big lever once HTTP deliverBatch (KOJAK-74) lands.

Test plan

  • ./gradlew test ktlintCheck -x :okapi-integration-tests:test — all unit tests + ktlint pass
  • ./gradlew :okapi-integration-tests:test — full Testcontainers suite (Postgres + MySQL + Kafka + WireMock) passes
  • ./gradlew :okapi-benchmarks:jmhJar + smoke Kafka throughput run — captured numbers above

endrju19 added 3 commits May 4, 2026 11:45
Overrides KafkaMessageDeliverer.deliverBatch with the fire-flush-await pattern.
Replaces N sequential producer.send().get() round-trips per batch with one
batched network round-trip:

  Phase 1 (fire)   producer.send() per entry — non-blocking, records queue
                   in producer's internal buffer
  Phase 2 (flush)  one producer.flush() — bypasses linger.ms, drains all
                   in-flight records to broker in one batched RTT
  Phase 3 (await)  Future.get() per entry — non-blocking after flush, just
                   reads settled completion

Per-entry classification preserved via SendOutcome sealed type — synchronous
exceptions in send() (BufferExhaustedException, SerializationException) are
caught individually so one failing send never aborts the batch. flush()
InterruptException restores the interrupt flag and lets remaining futures
surface their own outcome.

deliver() refactored to share buildRecord/classifyException helpers with
deliverBatch — no behavior change for single-entry path.

Benchmark results vs KOJAK-68 baseline (smoke run, MacBook M3 Max,
Postgres 16 + Kafka 3.8.1 in Testcontainers):

  batchSize  baseline    KOJAK-73    improvement
  10         ~109 msg/s  ~1,468 msg/s  13.5x
  50         ~115 msg/s  ~3,731 msg/s  32.3x
  100        ~115 msg/s  ~4,717 msg/s  41.0x

Throughput now SCALES with batchSize (was flat ~115 msg/s pre-KOJAK-73),
proving Kafka's internal record batching is being exploited. Sublinear
scaling 50 -> 100 indicates DB UPDATE per entry is becoming the next
bottleneck — exactly what KOJAK-75 (batch UPDATE via executeBatch)
addresses next.

Tests:
- KafkaMessageDelivererBatchTest covers empty input, all-success ordering,
  flush-counted-once verification, synchronous Permanent + Retriable send
  exceptions, and future-based async exception (driven via MockProducer
  override that completeNext/errorNext per position inside flush)
- Existing KafkaMessageDelivererTest unchanged — single-entry path still
  works identically
- Integration tests (real Kafka via Testcontainers) pass

Documents results in benchmarks/results-postopt-kojak-73.md and bumps
README headline numbers to reflect new Kafka throughput.
…s (KOJAK-73)

Addresses cross-cutting findings from PR #40 review (5 agents converged on
the interrupt/flush error handling).

Critical fix — flush() / interrupt path:
- Broaden flush() exception handling: previously only InterruptException was
  caught, so a fatal KafkaException or IllegalStateException (e.g. closed
  producer) would propagate out of deliverBatch and abandon all in-flight
  futures uncollected, contradicting the documented per-entry classification
  contract. Now any exception from flush() is logged and swallowed; awaitOne
  classifies each entry from its own future state.
- Add explicit InterruptedException handling in awaitOne and deliver. Without
  this, an interrupt during flush() would translate into PermanentFailure on
  pending futures (since java.lang.InterruptedException is NOT a Kafka
  RetriableException), incorrectly marking transient interrupts as terminal.
  Now classified as RetriableFailure with the interrupt flag re-armed.

Important — observability:
- Add SLF4J logger (implementation dep on slf4j-api added to okapi-kafka,
  consistent with okapi-core's pattern). Logs warn on flush failure (with
  batch size) and debug on synchronous send rejection (with outboxId).
- classifyException fallback for null exception messages now uses
  e.javaClass.simpleName instead of generic "Permanent/Retriable Kafka error",
  preserving debuggability when an exception's message is null.

Type design polish:
- SendOutcome.Sent is now @JvmInline value class instead of data class —
  Future has reference-only equals which a data class would falsely advertise.
  Zero runtime cost, removes the misleading promise.

Comments:
- Condensed class-level KDoc, removed unverified "since Kafka 3.0" claim
  about exception hierarchy.
- Reworded deliverBatch KDoc to drop numbered-step restatement and clarify
  that flush() failures never abandon the batch.
- Trimmed redundant test inline comments per "no redundant comments" rule.

New tests (cover gaps the reviews flagged):
- mixed sync-throw + async outcomes in one batch with positional integrity
- poison-pill malformed deliveryMetadata in fire phase
- flush() throws non-Interrupt exception → per-entry classification preserved
- flush() throws InterruptException → pending futures classified as Retriable

Verification:
- All unit tests pass (4 new + existing batch + single-entry tests)
- All integration tests pass (real Postgres + Kafka via Testcontainers)
- ktlint clean
- Smoke benchmark Kafka batchSize=50: 0.260 ms/op vs 0.268 pre-review
  (within noise; logger overhead is on failure paths only, happy path
  unaffected). 32x improvement vs KOJAK-68 baseline preserved.
…xception classification, file cleanup (KOJAK-73)

- Tighten MessageDeliverer.deliverBatch KDoc: normative contract for
  never-throw + per-entry classification + same-size guarantee
- Add real-Kafka integration tests for deliverBatch (25-entry batch
  ordering + empty-batch short-circuit)
- Classify bare ExecutionException (null cause) as Retriable rather
  than Permanent (extract classifyExecutionException helper)
- Fix logger.debug stack-trace loss in fireOne (pass exception, not
  toString())
- Drop @JvmInline from SendOutcome.Sent — inline opt is defeated when
  used inside sealed-interface when/is branches
- Add Kafka InterruptException tests covering deliver() and
  deliverBatch sync send paths
- Rename benchmark files from kojak-73-* to descriptive technical
  names; strip KOJAK refs from README
endrju19 added a commit that referenced this pull request May 14, 2026
…e failures (#44)

## Summary

`HttpMessageDeliverer.deliver()` previously caught all exceptions as
`RetriableFailure`. This meant **corrupt delivery metadata or an unknown
service triggered an infinite retry loop** instead of moving the entry
to `FAILED`.

This PR classifies exceptions explicitly:

| Exception | Classification | Why |
|---|---|---|
| `JsonProcessingException` | `PermanentFailure` | Corrupt metadata
won't fix itself (caught before `IOException` — it's a subtype) |
| `IOException` | `RetriableFailure` | Connection / timeout — transient
|
| `InterruptedException` | `RetriableFailure` | Interrupt flag restored;
consistent with `KafkaMessageDeliverer` |
| other `Exception` | `PermanentFailure` | Malformed URI, unknown
service, `IllegalStateException` from `ServiceUrlResolver`, etc. |

## Why this matters

Before: `okapi-http` consumer with malformed JSON in `deliveryMetadata`
→ infinite retries → retry storm in logs, entry never marked `FAILED`,
operator alert fatigue.

After: corrupt input → `PermanentFailure` → entry moves to `FAILED` per
`RetryPolicy`, ops visibility preserved.

## Scope

- `HttpMessageDeliverer.kt`: refactor to expression-body, granular
`catch` blocks, updated KDoc
- `HttpMessageDelivererTest.kt`: 2 new tests
  - corrupt metadata → `PermanentFailure` (does not throw)
  - `urlResolver` throwing → `PermanentFailure` (does not throw)

Independent of the Kafka `deliverBatch` work in PR #40.

## Test plan
- [x] `./gradlew :okapi-http:test ktlintCheck` — all 13 unit tests pass
(11 existing + 2 new)
private fun awaitOne(outcome: SendOutcome): DeliveryResult = when (outcome) {
is SendOutcome.ImmediateFailure -> outcome.result
is SendOutcome.Sent -> try {
outcome.future.get()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better get with timeout?

…s Pair<OutboxEntry, DeliveryResult> (#45)

## Summary

Replaces the anonymous tuple `List<Pair<OutboxEntry, DeliveryResult>>`
on `MessageDeliverer.deliverBatch` with a named domain type
`List<DeliveryOutcome>`. Same shape (two fields), better contract.

## Why

The batch-delivery result is a domain concept, not a utility tuple — it
deserves a name. Three concrete benefits:

| Aspect | `Pair<OutboxEntry, DeliveryResult>` | `DeliveryOutcome` |
|---|---|---|
| Kotlin access | `pair.first` / `pair.second` | `outcome.entry` /
`outcome.result` |
| Java access | `pair.getFirst()` / `pair.getSecond()` |
`outcome.getEntry()` / `outcome.getResult()` |
| Extensibility | Locked at 2 anonymous fields | Optional
`attemptNumber`, `latencyMs`, ... addable later |
| Domain modeling | Tuple | Named concept consistent with
`DeliveryResult` / `DeliveryInfo` / `MessageDeliverer` |

## Why pre-release

Currently `0.2.0-SNAPSHOT`, no external consumers. Renaming after
release would be a breaking change in a public API. The cost now is ~30
minutes; the cost later is a major version bump.

## Scope

- **New type:** `okapi-core/.../DeliveryOutcome.kt` — `data class
DeliveryOutcome(val entry: OutboxEntry, val result: DeliveryResult)`
- **Interface:** `MessageDeliverer.deliverBatch` returns
`List<DeliveryOutcome>`
- **`CompositeMessageDeliverer`:** builds `DeliveryOutcome` instances;
lookup map via `.associate { it.entry to it.result }`
- **`KafkaMessageDeliverer`:** the public override constructs
`DeliveryOutcome` on emission; the private `SendOutcome` sealed type is
untouched
- **Tests:** `.first`/`.second` → `.entry`/`.result`; destructuring
`(entry, result) -> ...` patterns unchanged (data-class `componentN`
keeps working)
- **`OutboxEntryProcessor`:** consumes via destructuring — zero changes

## Base branch

Based on `feature/kojak-73-kafka-deliver-batch` (PR #40) — cumulative
with that work. Merge order: PR #40 first, then this one.

## Test plan
- [x] `./gradlew clean ktlintFormat ktlintCheck build -x
:okapi-integration-tests:test` — all modules build, full unit suite
passes
- [x] `:okapi-integration-tests:compileTestKotlin` — integration test
compiles against new signature
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants