From 5b2605f673b0a1ab1c2a519c969aa062a26abe47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 20:55:49 +0800 Subject: [PATCH 1/3] idempotent is false still keep order by the mute --- async_producer.go | 67 ++++++++++++++++++++----- async_producer_test.go | 110 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 36 ++++++++++++++ 3 files changed, 202 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..8eb319ed3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } +func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.isMuted(topic, partition) +} + // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -370,12 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + // reusePartitionMute is a one-shot marker for non-idempotent retries. The + // first retried message from a failed, still-muted batch may reuse that + // existing in-flight reservation so later same-partition messages cannot + // slip in between the failed send and its retry. + reusePartitionMute bool + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1083,7 +1096,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { + readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() + if bp.flushingBatch == nil && readyToFlush { bp.tryBuildFlushingBatch() } @@ -1098,6 +1112,13 @@ func (bp *brokerProducer) run() { output = nil } + var unmuteCh <-chan struct{} + if bp.flushingBatch == nil && readyToFlush { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } + select { case msg, ok := <-bp.input: if !ok { @@ -1169,6 +1190,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1186,10 +1208,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial == nil { + if partial != nil { + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true + } + + reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { + partitions := bp.accumulatingBatch.msgs[topic] + if partitions == nil { + return false + } + set := partitions[partition] + return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) + }) + if reused == nil { return false } - bp.flushingBatch = partial + reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + pSet.clearPartitionMuteReuse() + }) + bp.flushingBatch = reused if bp.accumulatingBatch.empty() { bp.rollOver() } @@ -1354,7 +1395,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1397,6 +1438,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1492,13 +1534,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} + } + if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..ed5e6beeb 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } +func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { + config := NewTestConfig() + config.Producer.Flush.Messages = 1 + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + txnmgr: &transactionManager{}, + } + + blockedSet := newProduceSet(parent) + safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) + if !parent.muter.tryMute(blockedSet) { + t.Fatal("expected to mute partition externally") + } + + input := make(chan *ProducerMessage) + output := make(chan *produceSet, 1) + responses := make(chan *brokerProducerResponse) + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: input, + output: output, + responses: responses, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} + safeAddMessage(t, bp.accumulatingBatch, retryMsg) + + done := make(chan struct{}) + go func() { + bp.run() + close(done) + }() + + assertNotDone(t, output, 50*time.Millisecond) + parent.muter.unmute(blockedSet) + + flushed := assertDoneWithin(t, output, 2*time.Second) + require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) + + close(input) + close(responses) + assertDoneWithin(t, done, 2*time.Second) +} + // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + txnMgr := &transactionManager{ + producerID: 0, + producerEpoch: 0, + sequenceNumbers: make(map[string]int32), + } + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + txnmgr: txnMgr, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + + bp := &brokerProducer{ + parent: parent, + broker: leader, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + parent.brokers[leader] = bp + parent.brokerRefs[bp] = 1 + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + defer parent.muter.unmute(sent) + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, retryPartitionSet.msgs[0], retried) + require.True(t, retried.reusePartitionMute) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } + + safeAddMessage(t, bp.accumulatingBatch, retried) + if !bp.tryBuildFlushingBatch() { + t.Fatal("expected retry batch to reuse the existing partition mute") + } + require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) + require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) +} + type stubLeaderClient struct { cfg *Config leader *Broker diff --git a/produce_set.go b/produce_set.go index 380f7f5ab..a86235b9e 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,6 +12,42 @@ type partitionSet struct { bufferBytes int } +// markPartitionMuteReusable lets the first message carry the retry batch's +// right to reuse the partition mute that is still held by its failed send. +// Only the first message is marked because one held mute represents one +// in-flight retry permission; marking every message could let split retry +// fragments bypass each other under the same reservation. +func (ps *partitionSet) markPartitionMuteReusable() { + if len(ps.msgs) > 0 { + ps.msgs[0].reusePartitionMute = true + } +} + +// canReusePartitionMute reports whether this retry batch may be flushed while +// its partition is already muted by the previous send attempt. +func (ps *partitionSet) canReusePartitionMute() bool { + return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute +} + +// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry +// batch is selected for flushing. +func (ps *partitionSet) clearPartitionMuteReuse() { + for _, msg := range ps.msgs { + msg.reusePartitionMute = false + } +} + +// canRetry prevents holding a partition mute when every message in the set will +// be returned as an error instead of being retried. +func (ps *partitionSet) canRetry(maxRetries int) bool { + for _, msg := range ps.msgs { + if msg.retries < maxRetries { + return true + } + } + return false +} + type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet From 03c03861cc5f13cc28a2425fac5f3c1e9ba3c9b7 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 21:16:45 +0800 Subject: [PATCH 2/3] adjust the code Signed-off-by: 3AceShowHand --- async_producer.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/async_producer.go b/async_producer.go index 8eb319ed3..c9da85f57 100644 --- a/async_producer.go +++ b/async_producer.go @@ -377,17 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet + retries int + flags flagSet + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool // reusePartitionMute is a one-shot marker for non-idempotent retries. The // first retried message from a failed, still-muted batch may reuse that // existing in-flight reservation so later same-partition messages cannot // slip in between the failed send and its retry. reusePartitionMute bool - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -1096,9 +1096,15 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() - if bp.flushingBatch == nil && readyToFlush { + var unmuteCh <-chan struct{} + readyToFlush := bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) + if readyToFlush { bp.tryBuildFlushingBatch() + if bp.flushingBatch == nil { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } } var timerChan <-chan time.Time @@ -1112,13 +1118,6 @@ func (bp *brokerProducer) run() { output = nil } - var unmuteCh <-chan struct{} - if bp.flushingBatch == nil && readyToFlush { - if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { - unmuteCh = ch - } - } - select { case msg, ok := <-bp.input: if !ok { From 72f7f91aaa006066ab3bbdb80a8e4687e2dc931d Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 21:33:57 +0800 Subject: [PATCH 3/3] remove unnecessary change Signed-off-by: 3AceShowHand --- async_producer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index c9da85f57..99821e5b4 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1097,8 +1097,7 @@ func (bp *brokerProducer) run() { for { var unmuteCh <-chan struct{} - readyToFlush := bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) - if readyToFlush { + if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { bp.tryBuildFlushingBatch() if bp.flushingBatch == nil { if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked {