From 3ffe29b1ab280a6a710564a0e9e3b64e7b76bce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 14 May 2026 11:49:35 +0200 Subject: [PATCH 1/2] refactor: replace List> with named DeliveryOutcome on MessageDeliverer.deliverBatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Public API cleanup: the per-entry batch-delivery report is now a named domain type rather than an anonymous tuple. The result is the same shape (two fields: entry + result), but with named accessors, Java-friendly ergonomics, and room to grow. Why now (pre-release): - Named domain object > anonymous Pair in a public API — accessors (.entry / .result) carry meaning; Pair.first/Pair.second do not - Java consumers: outcome.getEntry() / outcome.getResult() vs. pair.getFirst() / pair.getSecond() — consistent with the @JvmName effort already invested elsewhere - Forward-compatible: optional fields (e.g. attemptNumber, latencyMs) can be added later without breaking callers; impossible with Pair - Same shape across transports (Kafka, planned HTTP, planned RabbitMQ etc.) — extracting the abstraction now is cheaper than after release Changes: - New okapi-core type: DeliveryOutcome(entry, result) - MessageDeliverer.deliverBatch signature: List - CompositeMessageDeliverer routing: builds DeliveryOutcome instances, uses .associate { it.entry to it.result } for lookup map - KafkaMessageDeliverer override: constructs DeliveryOutcome on emission - Tests updated: .first/.second → .entry/.result; destructuring (entry, result) patterns unchanged (data-class componentN works) OutboxEntryProcessor consumes via destructuring — no change needed. --- .../okapi/core/CompositeMessageDeliverer.kt | 10 +++-- .../okapi/core/DeliveryOutcome.kt | 15 +++++++ .../okapi/core/MessageDeliverer.kt | 2 +- .../core/CompositeMessageDelivererTest.kt | 24 ++++++------ .../okapi/core/MessageDelivererTest.kt | 12 +++--- .../okapi/kafka/KafkaMessageDeliverer.kt | 5 ++- .../kafka/KafkaMessageDelivererBatchTest.kt | 39 ++++++++++--------- 7 files changed, 63 insertions(+), 44 deletions(-) create mode 100644 okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt index 39eb076..1ba4ef6 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/CompositeMessageDeliverer.kt @@ -26,7 +26,7 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel * Entries whose type has no registered deliverer are mapped to * [DeliveryResult.PermanentFailure] (consistent with [deliver]). */ - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { if (entries.isEmpty()) return emptyList() val resultByEntry: Map = entries @@ -36,11 +36,13 @@ class CompositeMessageDeliverer(deliverers: List) : MessageDel if (deliverer != null) { deliverer.deliverBatch(group) } else { - group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") } + group.map { DeliveryOutcome(it, DeliveryResult.PermanentFailure("No deliverer registered for type '$type'")) } } } - .toMap() + .associate { it.entry to it.result } - return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) } + return entries.map { entry -> + DeliveryOutcome(entry, resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) + } } } diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt new file mode 100644 index 0000000..ac3d1e7 --- /dev/null +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt @@ -0,0 +1,15 @@ +package com.softwaremill.okapi.core + +/** + * Per-entry result of one [MessageDeliverer.deliverBatch] invocation: + * the original [entry] paired with the transport's classification of its + * delivery attempt as [DeliveryResult.Success], [DeliveryResult.RetriableFailure], + * or [DeliveryResult.PermanentFailure]. + * + * This is a transient transport-layer report — it is consumed by + * [OutboxEntryProcessor] in the same batch cycle and never persisted. + */ +data class DeliveryOutcome( + val entry: OutboxEntry, + val result: DeliveryResult, +) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt index 14691b7..43ea43e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/MessageDeliverer.kt @@ -33,5 +33,5 @@ interface MessageDeliverer { * surface as [DeliveryResult.RetriableFailure] or [DeliveryResult.PermanentFailure] * on the affected entries. */ - fun deliverBatch(entries: List): List> = entries.map { entry -> entry to deliver(entry) } + fun deliverBatch(entries: List): List = entries.map { entry -> DeliveryOutcome(entry, deliver(entry)) } } diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt index 4716327..81eb719 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/CompositeMessageDelivererTest.kt @@ -37,11 +37,11 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 4 - results.map { it.first } shouldBe entries - results[0].second shouldBe DeliveryResult.Success - results[1].second shouldBe DeliveryResult.RetriableFailure("503") - results[2].second shouldBe DeliveryResult.Success - results[3].second shouldBe DeliveryResult.RetriableFailure("503") + results.map { it.entry } shouldBe entries + results[0].result shouldBe DeliveryResult.Success + results[1].result shouldBe DeliveryResult.RetriableFailure("503") + results[2].result shouldBe DeliveryResult.Success + results[3].result shouldBe DeliveryResult.RetriableFailure("503") } test("deliverBatch fails permanently for entries with no registered deliverer") { @@ -56,9 +56,9 @@ class CompositeMessageDelivererTest : FunSpec({ val results = composite.deliverBatch(entries) results.size shouldBe 2 - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing" + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "missing" } test("deliverBatch with empty input returns empty list") { @@ -72,17 +72,17 @@ class CompositeMessageDelivererTest : FunSpec({ val kafkaDeliverer = object : MessageDeliverer { override val type = "kafka" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsKafka++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val httpDeliverer = object : MessageDeliverer { override val type = "http" override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { batchCallsHttp++ - return entries.map { it to DeliveryResult.Success } + return entries.map { DeliveryOutcome(it, DeliveryResult.Success) } } } val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer)) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt index 201900c..27c3753 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/MessageDelivererTest.kt @@ -29,12 +29,12 @@ class MessageDelivererTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 3 - results[0].first shouldBe entries[0] - results[0].second shouldBe DeliveryResult.Success - results[1].first shouldBe entries[1] - results[1].second shouldBe DeliveryResult.RetriableFailure("err1") - results[2].first shouldBe entries[2] - results[2].second shouldBe DeliveryResult.PermanentFailure("err2") + results[0].entry shouldBe entries[0] + results[0].result shouldBe DeliveryResult.Success + results[1].entry shouldBe entries[1] + results[1].result shouldBe DeliveryResult.RetriableFailure("err1") + results[2].entry shouldBe entries[2] + results[2].result shouldBe DeliveryResult.PermanentFailure("err2") } test("default deliverBatch on empty input returns empty list without calling deliver") { diff --git a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt index 6c763b3..52eaaf9 100644 --- a/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt +++ b/okapi-kafka/src/main/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDeliverer.kt @@ -1,5 +1,6 @@ package com.softwaremill.okapi.kafka +import com.softwaremill.okapi.core.DeliveryOutcome import com.softwaremill.okapi.core.DeliveryResult import com.softwaremill.okapi.core.MessageDeliverer import com.softwaremill.okapi.core.OutboxEntry @@ -45,7 +46,7 @@ class KafkaMessageDeliverer( * futures still surface their own exception via `get()` and are classified * individually — the batch as a whole is never abandoned. */ - override fun deliverBatch(entries: List): List> { + override fun deliverBatch(entries: List): List { if (entries.isEmpty()) return emptyList() val inflight: List> = entries.map { entry -> @@ -61,7 +62,7 @@ class KafkaMessageDeliverer( logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e) } - return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) } + return inflight.map { (entry, outcome) -> DeliveryOutcome(entry, awaitOne(outcome)) } } private fun fireOne(entry: OutboxEntry): SendOutcome = try { diff --git a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt index 3c5178c..18a9d00 100644 --- a/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt +++ b/okapi-kafka/src/test/kotlin/com/softwaremill/okapi/kafka/KafkaMessageDelivererBatchTest.kt @@ -1,5 +1,6 @@ package com.softwaremill.okapi.kafka +import com.softwaremill.okapi.core.DeliveryOutcome import com.softwaremill.okapi.core.DeliveryResult import com.softwaremill.okapi.core.OutboxEntry import com.softwaremill.okapi.core.OutboxMessage @@ -39,7 +40,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 3 - results.map { it.first } shouldBe entries + results.map { it.entry } shouldBe entries results.forEach { (_, r) -> r shouldBe DeliveryResult.Success } producer.history().size shouldBe 3 } @@ -85,7 +86,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val results = deliverer.deliverBatch(listOf(entry("a"))) - results[0].second.shouldBeInstanceOf() + results[0].result.shouldBeInstanceOf() } test("deliverBatch with future-based RetriableException classifies as RetriableFailure") { @@ -102,9 +103,9 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 2 - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - (results[1].second as DeliveryResult.RetriableFailure).error shouldContain "transient" + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.RetriableFailure).error shouldContain "transient" } test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") { @@ -130,13 +131,13 @@ class KafkaMessageDelivererBatchTest : FunSpec({ results.size shouldBe 3 // Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant - results.map { it.first } shouldBe entries + results.map { it.entry } shouldBe entries - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - (results[1].second as DeliveryResult.PermanentFailure).error shouldContain "forbidden" - results[2].second.shouldBeInstanceOf() - (results[2].second as DeliveryResult.RetriableFailure).error shouldContain "async fail" + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + (results[1].result as DeliveryResult.PermanentFailure).error shouldContain "forbidden" + results[2].result.shouldBeInstanceOf() + (results[2].result as DeliveryResult.RetriableFailure).error shouldContain "async fail" } test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") { @@ -149,10 +150,10 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val results = deliverer.deliverBatch(listOf(good1, poisoned, good2)) results.size shouldBe 3 - results.map { it.first } shouldBe listOf(good1, poisoned, good2) - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() - results[2].second shouldBe DeliveryResult.Success + results.map { it.entry } shouldBe listOf(good1, poisoned, good2) + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() + results[2].result shouldBe DeliveryResult.Success // Only the good entries actually reached the producer producer.history().size shouldBe 2 } @@ -173,8 +174,8 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val results = deliverer.deliverBatch(entries) results.size shouldBe 2 - results[0].second shouldBe DeliveryResult.Success - results[1].second.shouldBeInstanceOf() + results[0].result shouldBe DeliveryResult.Success + results[1].result.shouldBeInstanceOf() } test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") { @@ -190,7 +191,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({ val deliverer = KafkaMessageDeliverer(producer) val entries = listOf(entry("a")) - val results: List> + val results: List try { results = deliverer.deliverBatch(entries) } finally { @@ -199,6 +200,6 @@ class KafkaMessageDelivererBatchTest : FunSpec({ } results.size shouldBe 1 - results[0].second.shouldBeInstanceOf() + results[0].result.shouldBeInstanceOf() } }) From cfc3d003605bff765baa30652006b795c7426c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Thu, 14 May 2026 12:11:53 +0200 Subject: [PATCH 2/2] =?UTF-8?q?docs:=20trim=20DeliveryOutcome=20KDoc=20?= =?UTF-8?q?=E2=80=94=20drop=20signature=20restatement,=20keep=20lifecycle?= =?UTF-8?q?=20intent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per CLAUDE.md 'no redundant comments': first paragraph restated what the signature and DeliveryResult subtypes already express. Second paragraph (transient, never persisted) is the load-bearing insight. --- .../com/softwaremill/okapi/core/DeliveryOutcome.kt | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt index ac3d1e7..312a48f 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/DeliveryOutcome.kt @@ -1,13 +1,10 @@ package com.softwaremill.okapi.core /** - * Per-entry result of one [MessageDeliverer.deliverBatch] invocation: - * the original [entry] paired with the transport's classification of its - * delivery attempt as [DeliveryResult.Success], [DeliveryResult.RetriableFailure], - * or [DeliveryResult.PermanentFailure]. + * Per-entry result of one [MessageDeliverer.deliverBatch] invocation. * - * This is a transient transport-layer report — it is consumed by - * [OutboxEntryProcessor] in the same batch cycle and never persisted. + * Transient transport-layer report — consumed by [OutboxEntryProcessor] + * in the same batch cycle and never persisted. */ data class DeliveryOutcome( val entry: OutboxEntry,