Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CompositeMessageDeliverer(deliverers: List<MessageDeliverer>) : MessageDel
* Entries whose type has no registered deliverer are mapped to
* [DeliveryResult.PermanentFailure] (consistent with [deliver]).
*/
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
if (entries.isEmpty()) return emptyList()

val resultByEntry: Map<OutboxEntry, DeliveryResult> = entries
Expand All @@ -36,11 +36,13 @@ class CompositeMessageDeliverer(deliverers: List<MessageDeliverer>) : MessageDel
if (deliverer != null) {
deliverer.deliverBatch(group)
} else {
group.map { it to DeliveryResult.PermanentFailure("No deliverer registered for type '$type'") }
group.map { DeliveryOutcome(it, DeliveryResult.PermanentFailure("No deliverer registered for type '$type'")) }
}
}
.toMap()
.associate { it.entry to it.result }

return entries.map { entry -> entry to (resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}")) }
return entries.map { entry ->
DeliveryOutcome(entry, resultByEntry[entry] ?: error("missing result for entry ${entry.outboxId}"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.softwaremill.okapi.core

/**
* Per-entry result of one [MessageDeliverer.deliverBatch] invocation.
*
* Transient transport-layer report — consumed by [OutboxEntryProcessor]
* in the same batch cycle and never persisted.
*/
data class DeliveryOutcome(
val entry: OutboxEntry,
val result: DeliveryResult,
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ interface MessageDeliverer {
* surface as [DeliveryResult.RetriableFailure] or [DeliveryResult.PermanentFailure]
* on the affected entries.
*/
fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> = entries.map { entry -> entry to deliver(entry) }
fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> = entries.map { entry -> DeliveryOutcome(entry, deliver(entry)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class CompositeMessageDelivererTest : FunSpec({
val results = composite.deliverBatch(entries)

results.size shouldBe 4
results.map { it.first } shouldBe entries
results[0].second shouldBe DeliveryResult.Success
results[1].second shouldBe DeliveryResult.RetriableFailure("503")
results[2].second shouldBe DeliveryResult.Success
results[3].second shouldBe DeliveryResult.RetriableFailure("503")
results.map { it.entry } shouldBe entries
results[0].result shouldBe DeliveryResult.Success
results[1].result shouldBe DeliveryResult.RetriableFailure("503")
results[2].result shouldBe DeliveryResult.Success
results[3].result shouldBe DeliveryResult.RetriableFailure("503")
}

test("deliverBatch fails permanently for entries with no registered deliverer") {
Expand All @@ -56,9 +56,9 @@ class CompositeMessageDelivererTest : FunSpec({
val results = composite.deliverBatch(entries)

results.size shouldBe 2
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].second as DeliveryResult.PermanentFailure).error shouldContain "missing"
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].result as DeliveryResult.PermanentFailure).error shouldContain "missing"
}

test("deliverBatch with empty input returns empty list") {
Expand All @@ -72,17 +72,17 @@ class CompositeMessageDelivererTest : FunSpec({
val kafkaDeliverer = object : MessageDeliverer {
override val type = "kafka"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
batchCallsKafka++
return entries.map { it to DeliveryResult.Success }
return entries.map { DeliveryOutcome(it, DeliveryResult.Success) }
}
}
val httpDeliverer = object : MessageDeliverer {
override val type = "http"
override fun deliver(entry: OutboxEntry): DeliveryResult = DeliveryResult.Success
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
batchCallsHttp++
return entries.map { it to DeliveryResult.Success }
return entries.map { DeliveryOutcome(it, DeliveryResult.Success) }
}
}
val composite = CompositeMessageDeliverer(listOf(kafkaDeliverer, httpDeliverer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ class MessageDelivererTest : FunSpec({
val results = deliverer.deliverBatch(entries)

results.size shouldBe 3
results[0].first shouldBe entries[0]
results[0].second shouldBe DeliveryResult.Success
results[1].first shouldBe entries[1]
results[1].second shouldBe DeliveryResult.RetriableFailure("err1")
results[2].first shouldBe entries[2]
results[2].second shouldBe DeliveryResult.PermanentFailure("err2")
results[0].entry shouldBe entries[0]
results[0].result shouldBe DeliveryResult.Success
results[1].entry shouldBe entries[1]
results[1].result shouldBe DeliveryResult.RetriableFailure("err1")
results[2].entry shouldBe entries[2]
results[2].result shouldBe DeliveryResult.PermanentFailure("err2")
}

test("default deliverBatch on empty input returns empty list without calling deliver") {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.okapi.kafka

import com.softwaremill.okapi.core.DeliveryOutcome
import com.softwaremill.okapi.core.DeliveryResult
import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxEntry
Expand Down Expand Up @@ -45,7 +46,7 @@ class KafkaMessageDeliverer(
* futures still surface their own exception via `get()` and are classified
* individually — the batch as a whole is never abandoned.
*/
override fun deliverBatch(entries: List<OutboxEntry>): List<Pair<OutboxEntry, DeliveryResult>> {
override fun deliverBatch(entries: List<OutboxEntry>): List<DeliveryOutcome> {
if (entries.isEmpty()) return emptyList()

val inflight: List<Pair<OutboxEntry, SendOutcome>> = entries.map { entry ->
Expand All @@ -61,7 +62,7 @@ class KafkaMessageDeliverer(
logger.warn("Kafka producer.flush() failed for batch of {}; classifying per-entry from future state", entries.size, e)
}

return inflight.map { (entry, outcome) -> entry to awaitOne(outcome) }
return inflight.map { (entry, outcome) -> DeliveryOutcome(entry, awaitOne(outcome)) }
}

private fun fireOne(entry: OutboxEntry): SendOutcome = try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.softwaremill.okapi.kafka

import com.softwaremill.okapi.core.DeliveryOutcome
import com.softwaremill.okapi.core.DeliveryResult
import com.softwaremill.okapi.core.OutboxEntry
import com.softwaremill.okapi.core.OutboxMessage
Expand Down Expand Up @@ -39,7 +40,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({
val results = deliverer.deliverBatch(entries)

results.size shouldBe 3
results.map { it.first } shouldBe entries
results.map { it.entry } shouldBe entries
results.forEach { (_, r) -> r shouldBe DeliveryResult.Success }
producer.history().size shouldBe 3
}
Expand Down Expand Up @@ -85,7 +86,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({

val results = deliverer.deliverBatch(listOf(entry("a")))

results[0].second.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
results[0].result.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("deliverBatch with future-based RetriableException classifies as RetriableFailure") {
Expand All @@ -102,9 +103,9 @@ class KafkaMessageDelivererBatchTest : FunSpec({
val results = deliverer.deliverBatch(entries)

results.size shouldBe 2
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
(results[1].second as DeliveryResult.RetriableFailure).error shouldContain "transient"
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
(results[1].result as DeliveryResult.RetriableFailure).error shouldContain "transient"
}

test("deliverBatch handles mixed sync-throw + async outcomes in one batch with positional integrity") {
Expand All @@ -130,13 +131,13 @@ class KafkaMessageDelivererBatchTest : FunSpec({

results.size shouldBe 3
// Positional integrity: result[i] corresponds to entries[i] regardless of outcome variant
results.map { it.first } shouldBe entries
results.map { it.entry } shouldBe entries

results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].second as DeliveryResult.PermanentFailure).error shouldContain "forbidden"
results[2].second.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
(results[2].second as DeliveryResult.RetriableFailure).error shouldContain "async fail"
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
(results[1].result as DeliveryResult.PermanentFailure).error shouldContain "forbidden"
results[2].result.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
(results[2].result as DeliveryResult.RetriableFailure).error shouldContain "async fail"
}

test("deliverBatch poison-pill metadata yields PermanentFailure for bad entry, others unaffected") {
Expand All @@ -149,10 +150,10 @@ class KafkaMessageDelivererBatchTest : FunSpec({
val results = deliverer.deliverBatch(listOf(good1, poisoned, good2))

results.size shouldBe 3
results.map { it.first } shouldBe listOf(good1, poisoned, good2)
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
results[2].second shouldBe DeliveryResult.Success
results.map { it.entry } shouldBe listOf(good1, poisoned, good2)
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.PermanentFailure>()
results[2].result shouldBe DeliveryResult.Success
// Only the good entries actually reached the producer
producer.history().size shouldBe 2
}
Expand All @@ -173,8 +174,8 @@ class KafkaMessageDelivererBatchTest : FunSpec({
val results = deliverer.deliverBatch(entries)

results.size shouldBe 2
results[0].second shouldBe DeliveryResult.Success
results[1].second.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
results[0].result shouldBe DeliveryResult.Success
results[1].result.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}

test("deliverBatch interrupted during flush re-arms interrupt flag and classifies pending futures as Retriable") {
Expand All @@ -190,7 +191,7 @@ class KafkaMessageDelivererBatchTest : FunSpec({
val deliverer = KafkaMessageDeliverer(producer)
val entries = listOf(entry("a"))

val results: List<Pair<OutboxEntry, DeliveryResult>>
val results: List<DeliveryOutcome>
try {
results = deliverer.deliverBatch(entries)
} finally {
Expand All @@ -199,6 +200,6 @@ class KafkaMessageDelivererBatchTest : FunSpec({
}

results.size shouldBe 1
results[0].second.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
results[0].result.shouldBeInstanceOf<DeliveryResult.RetriableFailure>()
}
})