From 5b2605f673b0a1ab1c2a519c969aa062a26abe47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 20:55:49 +0800 Subject: [PATCH 1/3] idempotent is false still keep order by the mute --- async_producer.go | 67 ++++++++++++++++++++----- async_producer_test.go | 110 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 36 ++++++++++++++ 3 files changed, 202 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..8eb319ed3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } +func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.isMuted(topic, partition) +} + // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -370,12 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + // reusePartitionMute is a one-shot marker for non-idempotent retries. The + // first retried message from a failed, still-muted batch may reuse that + // existing in-flight reservation so later same-partition messages cannot + // slip in between the failed send and its retry. + reusePartitionMute bool + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1083,7 +1096,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { + readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() + if bp.flushingBatch == nil && readyToFlush { bp.tryBuildFlushingBatch() } @@ -1098,6 +1112,13 @@ func (bp *brokerProducer) run() { output = nil } + var unmuteCh <-chan struct{} + if bp.flushingBatch == nil && readyToFlush { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } + select { case msg, ok := <-bp.input: if !ok { @@ -1169,6 +1190,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1186,10 +1208,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial == nil { + if partial != nil { + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true + } + + reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { + partitions := bp.accumulatingBatch.msgs[topic] + if partitions == nil { + return false + } + set := partitions[partition] + return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) + }) + if reused == nil { return false } - bp.flushingBatch = partial + reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + pSet.clearPartitionMuteReuse() + }) + bp.flushingBatch = reused if bp.accumulatingBatch.empty() { bp.rollOver() } @@ -1354,7 +1395,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1397,6 +1438,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1492,13 +1534,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} + } + if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..ed5e6beeb 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } +func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { + config := NewTestConfig() + config.Producer.Flush.Messages = 1 + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + txnmgr: &transactionManager{}, + } + + blockedSet := newProduceSet(parent) + safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) + if !parent.muter.tryMute(blockedSet) { + t.Fatal("expected to mute partition externally") + } + + input := make(chan *ProducerMessage) + output := make(chan *produceSet, 1) + responses := make(chan *brokerProducerResponse) + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: input, + output: output, + responses: responses, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} + safeAddMessage(t, bp.accumulatingBatch, retryMsg) + + done := make(chan struct{}) + go func() { + bp.run() + close(done) + }() + + assertNotDone(t, output, 50*time.Millisecond) + parent.muter.unmute(blockedSet) + + flushed := assertDoneWithin(t, output, 2*time.Second) + require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) + + close(input) + close(responses) + assertDoneWithin(t, done, 2*time.Second) +} + // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + txnMgr := &transactionManager{ + producerID: 0, + producerEpoch: 0, + sequenceNumbers: make(map[string]int32), + } + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + txnmgr: txnMgr, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + + bp := &brokerProducer{ + parent: parent, + broker: leader, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + parent.brokers[leader] = bp + parent.brokerRefs[bp] = 1 + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + defer parent.muter.unmute(sent) + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, retryPartitionSet.msgs[0], retried) + require.True(t, retried.reusePartitionMute) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } + + safeAddMessage(t, bp.accumulatingBatch, retried) + if !bp.tryBuildFlushingBatch() { + t.Fatal("expected retry batch to reuse the existing partition mute") + } + require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) + require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) +} + type stubLeaderClient struct { cfg *Config leader *Broker diff --git a/produce_set.go b/produce_set.go index 380f7f5ab..a86235b9e 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,6 +12,42 @@ type partitionSet struct { bufferBytes int } +// markPartitionMuteReusable lets the first message carry the retry batch's +// right to reuse the partition mute that is still held by its failed send. +// Only the first message is marked because one held mute represents one +// in-flight retry permission; marking every message could let split retry +// fragments bypass each other under the same reservation. +func (ps *partitionSet) markPartitionMuteReusable() { + if len(ps.msgs) > 0 { + ps.msgs[0].reusePartitionMute = true + } +} + +// canReusePartitionMute reports whether this retry batch may be flushed while +// its partition is already muted by the previous send attempt. +func (ps *partitionSet) canReusePartitionMute() bool { + return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute +} + +// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry +// batch is selected for flushing. +func (ps *partitionSet) clearPartitionMuteReuse() { + for _, msg := range ps.msgs { + msg.reusePartitionMute = false + } +} + +// canRetry prevents holding a partition mute when every message in the set will +// be returned as an error instead of being retried. +func (ps *partitionSet) canRetry(maxRetries int) bool { + for _, msg := range ps.msgs { + if msg.retries < maxRetries { + return true + } + } + return false +} + type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet From fbeb6ba5c6b94274965dee73b59f9fddbbb4db6e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Sat, 20 Jun 2026 16:33:47 +0800 Subject: [PATCH 2/3] fix code by the review comment Signed-off-by: 3AceShowHand --- async_producer.go | 130 +++++++++++++++++------------------------ async_producer_test.go | 12 ++-- produce_set_test.go | 37 +++++++----- 3 files changed, 85 insertions(+), 94 deletions(-) diff --git a/async_producer.go b/async_producer.go index 14c813ace..0fec24215 100644 --- a/async_producer.go +++ b/async_producer.go @@ -93,10 +93,11 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup - // shutdownCh is closed before waiting on in-flight messages so retryBatch + + // done is closed before waiting on in-flight messages so retryBatch // goroutines can stop waiting on broker handoff and release partition mutes. - shutdownCh chan struct{} - shutdownChClosed atomic.Bool + done chan struct{} + closed atomic.Bool brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int @@ -117,9 +118,9 @@ type asyncProducer struct { type retryBufferQuota struct { // messages and bytes track producer-level retry buffer occupancy when // Producer.Retry.MaxBufferLength or MaxBufferBytes is bounded. + mu sync.Mutex messages int64 bytes int64 - mu sync.Mutex } type partitionMuter struct { @@ -318,7 +319,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), - shutdownCh: make(chan struct{}), + done: make(chan struct{}), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), txnmgr: txnmgr, @@ -797,33 +798,19 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan } func (pp *partitionProducer) backoff(retries int) { - pp.parent.backoff(retries) + time.Sleep(pp.parent.calcBackoff(retries)) } -func (p *asyncProducer) backoff(retries int) { - backoff := p.retryBackoff(retries) - if backoff > 0 { - time.Sleep(backoff) +func (p *asyncProducer) calcBackoff(retries int) time.Duration { + if backoffFunc := p.conf.Producer.Retry.BackoffFunc; backoffFunc != nil { + return backoffFunc(retries, p.conf.Producer.Retry.Max) } -} - -func (p *asyncProducer) retryBackoff(retries int) time.Duration { - var backoff time.Duration - if p.conf.Producer.Retry.BackoffFunc != nil { - maxRetries := p.conf.Producer.Retry.Max - backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries) - } else { - backoff = p.conf.Producer.Retry.Backoff - } - return backoff + return p.conf.Producer.Retry.Backoff } func (p *asyncProducer) shuttingDown() bool { - if p.shutdownCh == nil { - return false - } select { - case <-p.shutdownCh: + case <-p.done: return true default: return false @@ -831,20 +818,16 @@ func (p *asyncProducer) shuttingDown() bool { } func (p *asyncProducer) backoffOrDone(retries int) bool { - backoff := p.retryBackoff(retries) + backoff := p.calcBackoff(retries) if backoff <= 0 { return !p.shuttingDown() } timer := time.NewTimer(backoff) defer timer.Stop() - if p.shutdownCh == nil { - <-timer.C - return true - } select { case <-timer.C: return true - case <-p.shutdownCh: + case <-p.done: return false } } @@ -1130,7 +1113,8 @@ type brokerProducer struct { // output while the batch still owns its partition mute; if the brokerProducer // is shutting down, that send may never be received and the mute would block // later same-partition messages and producer shutdown. - done chan struct{} + done chan struct{} + closed atomic.Bool accumulatingBatch *produceSet flushingBatch *produceSet // batch that has been muted and is ready to send @@ -1254,18 +1238,20 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial != nil { - bp.flushingBatch = partial - if bp.accumulatingBatch.empty() { - bp.rollOver() - } - return true + if partial == nil { + return false } - - return false + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true } func (bp *brokerProducer) shutdown() { + if bp.closed.Swap(true) { + return + } if bp.done != nil { close(bp.done) } @@ -1502,6 +1488,8 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio bufferReserved = false } } + defer releaseBuffer() + muted := alreadyMuted failBatch := func(err error) { releaseBuffer() @@ -1511,9 +1499,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio p.muter.unmute(produceSet) muted = false } - for _, msg := range pSet.msgs { - p.returnError(msg, err) - } + p.returnErrors(pSet.msgs, err) } for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { @@ -1568,9 +1554,7 @@ func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) { produceSet := newProduceSet(p) produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet) p.muter.unmute(produceSet) - for _, msg := range batch.pSet.msgs { - p.returnError(msg, err) - } + p.returnErrors(batch.pSet.msgs, err) } func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) { @@ -1581,13 +1565,13 @@ func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err erro func (p *asyncProducer) retryBufferLimits() (int64, int64) { maxBufferLength := p.conf.Producer.Retry.MaxBufferLength - if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { - maxBufferLength = minFunctionalRetryBufferLength + if maxBufferLength > 0 { + maxBufferLength = max(maxBufferLength, minFunctionalRetryBufferLength) } maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes - if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { - maxBufferBytes = minFunctionalRetryBufferBytes + if maxBufferBytes > 0 { + maxBufferBytes = max(maxBufferBytes, minFunctionalRetryBufferBytes) } return int64(maxBufferLength), maxBufferBytes @@ -1618,14 +1602,17 @@ func (p *asyncProducer) reserveBuffer(messages, bytes int64) bool { if !p.retryBufferBounded() { return true } + p.retryBuffer.mu.Lock() defer p.retryBuffer.mu.Unlock() - if p.bufferWouldOverflow(p.retryBuffer.messages+messages, p.retryBuffer.bytes+bytes) { + messages += p.retryBuffer.messages + bytes += p.retryBuffer.bytes + if p.bufferWouldOverflow(messages, bytes) { return false } - p.retryBuffer.messages += messages - p.retryBuffer.bytes += bytes + p.retryBuffer.messages = messages + p.retryBuffer.bytes = bytes return true } @@ -1636,12 +1623,15 @@ func (p *asyncProducer) addToBuffer(messages, bytes int64) bool { if !p.retryBufferBounded() { return false } + p.retryBuffer.mu.Lock() defer p.retryBuffer.mu.Unlock() - p.retryBuffer.messages += messages - p.retryBuffer.bytes += bytes - return p.bufferWouldOverflow(p.retryBuffer.messages, p.retryBuffer.bytes) + nextMessages := p.retryBuffer.messages + messages + nextBytes := p.retryBuffer.bytes + bytes + p.retryBuffer.messages = nextMessages + p.retryBuffer.bytes = nextBytes + return p.bufferWouldOverflow(nextMessages, nextBytes) } func (p *asyncProducer) releaseBuffer(messages, bytes int64) { @@ -1737,9 +1727,9 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part p.failMutedBatch(batch, ErrShuttingDown) continue } - leader, leaderErr := p.client.Leader(batch.topic, batch.partition) - if leaderErr != nil { - Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, leaderErr) + leader, err := p.client.Leader(batch.topic, batch.partition) + if err != nil { + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, err) batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) p.releaseBuffer(batchMessages, batchBytes) p.failMutedBatch(batch, retryErr) @@ -1767,9 +1757,7 @@ func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { if !accepted { p.muter.unmute(set) set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - for _, msg := range pSet.msgs { - p.returnError(msg, ErrShuttingDown) - } + p.returnErrors(pSet.msgs, ErrShuttingDown) }) } } @@ -1779,12 +1767,8 @@ func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { // release the mute and fail the messages instead of leaving a retry goroutine // blocked while it still owns the partition mute. func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool { - var done <-chan struct{} - if bp.done != nil { - done = bp.done - } select { - case <-done: + case <-bp.done: return false default: } @@ -1793,16 +1777,12 @@ func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool return true default: } - var shutdownCh <-chan struct{} - if p.shutdownCh != nil { - shutdownCh = p.shutdownCh - } select { case bp.output <- set: return true - case <-done: + case <-bp.done: return false - case <-shutdownCh: + case <-p.done: return false } } @@ -1921,8 +1901,8 @@ func (p *asyncProducer) retryHandler() { func (p *asyncProducer) shutdown() { Logger.Println("Producer shutting down.") - if p.shutdownCh != nil && p.shutdownChClosed.CompareAndSwap(false, true) { - close(p.shutdownCh) + if p.done != nil && p.closed.CompareAndSwap(false, true) { + close(p.done) } p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} diff --git a/async_producer_test.go b/async_producer_test.go index a3c220f63..c19b87e5b 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1562,6 +1562,8 @@ func TestBrokerProducerShutdown(t *testing.T) { bp := producer.(*asyncProducer).getBrokerProducer(broker) // Initiate the shutdown of all of them producer.(*asyncProducer).unrefBrokerProducer(broker, bp) + assertDoneWithin(t, bp.done, time.Second) + require.NotPanics(t, bp.shutdown) _ = producer.Close() mockBroker.Close() @@ -1664,7 +1666,7 @@ func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), errors: make(chan *ProducerError, errorBuffer), - shutdownCh: make(chan struct{}), + done: make(chan struct{}), txnmgr: &transactionManager{}, } leader := &Broker{id: 1} @@ -2399,7 +2401,7 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), errors: make(chan *ProducerError, 1), - shutdownCh: make(chan struct{}), + done: make(chan struct{}), txnmgr: &transactionManager{}, } leader := &Broker{id: 1} @@ -2428,8 +2430,8 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { }() waitForProducerBuffer(t, parent, 1, -1) - if parent.shutdownChClosed.CompareAndSwap(false, true) { - close(parent.shutdownCh) + if parent.closed.CompareAndSwap(false, true) { + close(parent.done) } producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) @@ -2568,7 +2570,7 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), errors: make(chan *ProducerError, 1), - shutdownCh: make(chan struct{}), + done: make(chan struct{}), txnmgr: &transactionManager{}, } leader := &Broker{id: 1} diff --git a/produce_set_test.go b/produce_set_test.go index 359bfd47b..ea8086750 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -26,20 +26,29 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } func TestShouldKeepMuted(t *testing.T) { - if (&partitionSet{}).shouldKeepMuted(1) { - t.Fatal("empty partition set should not be retryable") - } - if !(&partitionSet{msgs: []*ProducerMessage{ - {retries: 0}, - {retries: 0}, - }}).shouldKeepMuted(1) { - t.Fatal("expected batch to be retryable when every message is below retry max") - } - if (&partitionSet{msgs: []*ProducerMessage{ - {retries: 0}, - {retries: 1}, - }}).shouldKeepMuted(1) { - t.Fatal("expected batch not to be retryable when any message has exhausted retries") + pSet := &partitionSet{} + if pSet.shouldKeepMuted(1) { + t.Error("empty partition set should not be retryable") + } + + pSet = &partitionSet{ + msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 0}, + }, + } + if !pSet.shouldKeepMuted(1) { + t.Error("expected batch to be retryable when every message is below retry max") + } + + pSet = &partitionSet{ + msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 1}, + }, + } + if pSet.shouldKeepMuted(1) { + t.Error("expected batch not to be retryable when any message has exhausted retries") } } From e34fc32f17ef6d798202985e2b845c20a62482f8 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 22 Jun 2026 17:01:25 +0800 Subject: [PATCH 3/3] refactor the code, remove useless wrappers and add more abstraction Signed-off-by: 3AceShowHand --- async_producer.go | 272 +++++++++--------- async_producer_test.go | 605 +++++++++++++++++------------------------ 2 files changed, 370 insertions(+), 507 deletions(-) diff --git a/async_producer.go b/async_producer.go index 0fec24215..c9d5f7d44 100644 --- a/async_producer.go +++ b/async_producer.go @@ -103,8 +103,6 @@ type asyncProducer struct { brokerRefs map[*brokerProducer]int brokerLock sync.Mutex - retryBuffer retryBufferQuota - txnmgr *transactionManager txLock sync.Mutex @@ -112,15 +110,11 @@ type asyncProducer struct { // mirroring Kafka's RecordAccumulator. muter *partitionMuter - metricsRegistry metrics.Registry -} - -type retryBufferQuota struct { - // messages and bytes track producer-level retry buffer occupancy when + // retryBufferQuota tracks producer-level retry buffer occupancy when // Producer.Retry.MaxBufferLength or MaxBufferBytes is bounded. - mu sync.Mutex - messages int64 - bytes int64 + retryBufferQuota messageQuota + + metricsRegistry metrics.Registry } type partitionMuter struct { @@ -313,18 +307,19 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { } p := &asyncProducer{ - client: client, - conf: client.Config(), - errors: make(chan *ProducerError), - input: make(chan *ProducerMessage), - successes: make(chan *ProducerMessage), - retries: make(chan *ProducerMessage), - done: make(chan struct{}), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: txnmgr, - muter: newPartitionMuter(), - metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry), + client: client, + conf: client.Config(), + errors: make(chan *ProducerError), + input: make(chan *ProducerMessage), + successes: make(chan *ProducerMessage), + retries: make(chan *ProducerMessage), + done: make(chan struct{}), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: txnmgr, + muter: newPartitionMuter(), + retryBufferQuota: newRetryBufferQuota(client.Config()), + metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry), } // launch our singleton dispatchers @@ -1481,18 +1476,13 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet := newProduceSet(p) produceSet.addPartitionSet(topic, partition, pSet) bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) - bufferReserved := false - releaseBuffer := func() { - if bufferReserved { - p.releaseBuffer(bufferMessages, bufferBytes) - bufferReserved = false - } - } - defer releaseBuffer() + var reservedMessages, reservedBytes int64 + defer func() { + p.retryBufferQuota.release(reservedMessages, reservedBytes) + }() muted := alreadyMuted failBatch := func(err error) { - releaseBuffer() // Release the partition before reporting errors so a blocked Errors // consumer cannot keep later same-partition messages muted. if muted { @@ -1508,11 +1498,11 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } msg.retries++ } - if !p.reserveBuffer(bufferMessages, bufferBytes) { + if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { failBatch(ErrProducerRetryBufferOverflow) return } - bufferReserved = p.retryBufferBounded() && (bufferMessages != 0 || bufferBytes != 0) + reservedMessages, reservedBytes = bufferMessages, bufferBytes // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. @@ -1537,11 +1527,9 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } bp := p.getBrokerProducer(leader) defer p.unrefBrokerProducer(leader, bp) - if !p.sendRetryBatch(bp, produceSet) { + if !p.handoffRetryBatch(bp, produceSet) { failBatch(ErrShuttingDown) - return } - releaseBuffer() } type partitionBatchRetry struct { @@ -1563,13 +1551,18 @@ func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err erro } } -func (p *asyncProducer) retryBufferLimits() (int64, int64) { - maxBufferLength := p.conf.Producer.Retry.MaxBufferLength +func newRetryBufferQuota(conf *Config) messageQuota { + maxBufferLength, maxBufferBytes := retryBufferLimits(conf) + return newMessageQuota(maxBufferLength, maxBufferBytes) +} + +func retryBufferLimits(conf *Config) (int64, int64) { + maxBufferLength := conf.Producer.Retry.MaxBufferLength if maxBufferLength > 0 { maxBufferLength = max(maxBufferLength, minFunctionalRetryBufferLength) } - maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes + maxBufferBytes := conf.Producer.Retry.MaxBufferBytes if maxBufferBytes > 0 { maxBufferBytes = max(maxBufferBytes, minFunctionalRetryBufferBytes) } @@ -1577,77 +1570,13 @@ func (p *asyncProducer) retryBufferLimits() (int64, int64) { return int64(maxBufferLength), maxBufferBytes } -func (p *asyncProducer) retryBufferBounded() bool { - maxBufferLength, maxBufferBytes := p.retryBufferLimits() - return maxBufferLength > 0 || maxBufferBytes > 0 -} - -func (p *asyncProducer) producerMessageByteSizeVersion() int { - if p.conf.Version.IsAtLeast(V0_11_0_0) { +func producerMessageByteSizeVersion(conf *Config) int { + if conf.Version.IsAtLeast(V0_11_0_0) { return 2 } return 1 } -func (p *asyncProducer) bufferWouldOverflow(messages, bytes int64) bool { - maxBufferLength, maxBufferBytes := p.retryBufferLimits() - return (maxBufferLength > 0 && messages >= maxBufferLength) || - (maxBufferBytes > 0 && bytes >= maxBufferBytes) -} - -func (p *asyncProducer) reserveBuffer(messages, bytes int64) bool { - if messages == 0 && bytes == 0 { - return true - } - if !p.retryBufferBounded() { - return true - } - - p.retryBuffer.mu.Lock() - defer p.retryBuffer.mu.Unlock() - - messages += p.retryBuffer.messages - bytes += p.retryBuffer.bytes - if p.bufferWouldOverflow(messages, bytes) { - return false - } - p.retryBuffer.messages = messages - p.retryBuffer.bytes = bytes - return true -} - -func (p *asyncProducer) addToBuffer(messages, bytes int64) bool { - if messages == 0 && bytes == 0 { - return false - } - if !p.retryBufferBounded() { - return false - } - - p.retryBuffer.mu.Lock() - defer p.retryBuffer.mu.Unlock() - - nextMessages := p.retryBuffer.messages + messages - nextBytes := p.retryBuffer.bytes + bytes - p.retryBuffer.messages = nextMessages - p.retryBuffer.bytes = nextBytes - return p.bufferWouldOverflow(nextMessages, nextBytes) -} - -func (p *asyncProducer) releaseBuffer(messages, bytes int64) { - if messages == 0 && bytes == 0 { - return - } - if !p.retryBufferBounded() { - return - } - p.retryBuffer.mu.Lock() - defer p.retryBuffer.mu.Unlock() - - p.retryBuffer.messages -= messages - p.retryBuffer.bytes -= bytes -} - // retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. // Connection errors already hold partition mutes; doing the refresh in the // response loop can block all progress for that broker while the cluster is @@ -1692,13 +1621,13 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part bufferMessages += int64(len(batch.pSet.msgs)) bufferBytes += int64(batch.pSet.bufferBytes) } - if !p.reserveBuffer(bufferMessages, bufferBytes) { + if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) return } if p.shuttingDown() { - p.releaseBuffer(bufferMessages, bufferBytes) + p.retryBufferQuota.release(bufferMessages, bufferBytes) p.failMutedBatches(retryable, ErrShuttingDown) return } @@ -1708,13 +1637,13 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part } } if p.shuttingDown() { - p.releaseBuffer(bufferMessages, bufferBytes) + p.retryBufferQuota.release(bufferMessages, bufferBytes) p.failMutedBatches(retryable, ErrShuttingDown) return } if !p.backoffOrDone(maxRetryAttempt) { - p.releaseBuffer(bufferMessages, bufferBytes) + p.retryBufferQuota.release(bufferMessages, bufferBytes) p.failMutedBatches(retryable, ErrShuttingDown) return } @@ -1723,7 +1652,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part for _, batch := range retryable { if p.shuttingDown() { batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) - p.releaseBuffer(batchMessages, batchBytes) + p.retryBufferQuota.release(batchMessages, batchBytes) p.failMutedBatch(batch, ErrShuttingDown) continue } @@ -1731,7 +1660,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part if err != nil { Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, err) batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) - p.releaseBuffer(batchMessages, batchBytes) + p.retryBufferQuota.release(batchMessages, batchBytes) p.failMutedBatch(batch, retryErr) continue } @@ -1744,39 +1673,26 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part } for leader, set := range brokerSets { - p.handoffRetrySet(leader, set) - } -} - -func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { - bp := p.getBrokerProducer(leader) - accepted := p.sendRetryBatch(bp, set) - p.unrefBrokerProducer(leader, bp) - setMessages, setBytes := int64(set.bufferCount), int64(set.bufferBytes) - p.releaseBuffer(setMessages, setBytes) - if !accepted { - p.muter.unmute(set) - set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - p.returnErrors(pSet.msgs, ErrShuttingDown) - }) + bp := p.getBrokerProducer(leader) + accepted := p.handoffRetryBatch(bp, set) + p.unrefBrokerProducer(leader, bp) + p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) + if !accepted { + p.muter.unmute(set) + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + p.returnErrors(pSet.msgs, ErrShuttingDown) + }) + } } } -// sendRetryBatch transfers ownership of a muted retry batch to the broker -// bridge. A false result means no owner accepted the batch, so the caller must -// release the mute and fail the messages instead of leaving a retry goroutine -// blocked while it still owns the partition mute. -func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool { - select { - case <-bp.done: +// handoffRetryBatch transfers ownership of a muted retry batch to the broker +// bridge. On false, the caller still owns the mute and must fail the batch. +func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool { + if bp.closed.Load() { return false - default: - } - select { - case bp.output <- set: - return true - default: } + select { case bp.output <- set: return true @@ -1853,7 +1769,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - version := p.producerMessageByteSizeVersion() + version := producerMessageByteSizeVersion(p.conf) var msg *ProducerMessage buf := queue.New() @@ -1866,7 +1782,7 @@ func (p *asyncProducer) retryHandler() { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): msgToRemove := buf.Remove().(*ProducerMessage) - p.releaseBuffer(1, int64(msgToRemove.ByteSize(version))) + p.retryBufferQuota.release(1, int64(msgToRemove.ByteSize(version))) continue } } @@ -1876,7 +1792,7 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) - bufferOverflow := p.addToBuffer(1, int64(msg.ByteSize(version))) + bufferOverflow := p.retryBufferQuota.addAndCheckOverflow(1, int64(msg.ByteSize(version))) if !bufferOverflow { continue @@ -1887,10 +1803,10 @@ func (p *asyncProducer) retryHandler() { select { case p.input <- msgToHandle: buf.Remove() - p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) + p.retryBufferQuota.release(1, int64(msgToHandle.ByteSize(version))) default: buf.Remove() - p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) + p.retryBufferQuota.release(1, int64(msgToHandle.ByteSize(version))) p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) } } @@ -2048,3 +1964,71 @@ func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { delete(p.brokers, broker) } + +// messageQuota tracks message and byte counts against optional maximums. +type messageQuota struct { + mu sync.Mutex + messages int64 + bytes int64 + maxMessages int64 + maxBytes int64 +} + +func newMessageQuota(maxMessages, maxBytes int64) messageQuota { + return messageQuota{ + maxMessages: maxMessages, + maxBytes: maxBytes, + } +} + +func (q *messageQuota) tryReserve(messages, bytes int64) bool { + if !q.shouldTrack(messages, bytes) { + return true + } + + q.mu.Lock() + defer q.mu.Unlock() + + nextMessages := q.messages + messages + nextBytes := q.bytes + bytes + if q.wouldOverflow(nextMessages, nextBytes) { + return false + } + q.messages = nextMessages + q.bytes = nextBytes + return true +} + +func (q *messageQuota) addAndCheckOverflow(messages, bytes int64) bool { + if !q.shouldTrack(messages, bytes) { + return false + } + + q.mu.Lock() + defer q.mu.Unlock() + + q.messages += messages + q.bytes += bytes + return q.wouldOverflow(q.messages, q.bytes) +} + +func (q *messageQuota) release(messages, bytes int64) { + if !q.shouldTrack(messages, bytes) { + return + } + + q.mu.Lock() + defer q.mu.Unlock() + + q.messages -= messages + q.bytes -= bytes +} + +func (q *messageQuota) shouldTrack(messages, bytes int64) bool { + return (messages != 0 || bytes != 0) && (q.maxMessages > 0 || q.maxBytes > 0) +} + +func (q *messageQuota) wouldOverflow(messages, bytes int64) bool { + return (q.maxMessages > 0 && messages >= q.maxMessages) || + (q.maxBytes > 0 && bytes >= q.maxBytes) +} diff --git a/async_producer_test.go b/async_producer_test.go index c19b87e5b..b16e0bf61 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1637,37 +1637,16 @@ func assertDoneWithin[T any](t *testing.T, ch <-chan T, timeout time.Duration) T } } -func waitForProducerBuffer(t *testing.T, parent *asyncProducer, messages, bytes int64) { - t.Helper() - deadline := time.After(2 * time.Second) - ticker := time.NewTicker(time.Millisecond) - defer ticker.Stop() - for { - parent.retryBuffer.mu.Lock() - gotMessages := parent.retryBuffer.messages - gotBytes := parent.retryBuffer.bytes - parent.retryBuffer.mu.Unlock() - if (messages < 0 || gotMessages == messages) && (bytes < 0 || gotBytes == bytes) { - return - } - select { - case <-deadline: - t.Fatalf("timed out waiting for buffer budget messages=%d bytes=%d, got messages=%d bytes=%d", - messages, bytes, gotMessages, gotBytes) - case <-ticker.C: - } - } -} - func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, *brokerProducer) { parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, errorBuffer), - done: make(chan struct{}), - txnmgr: &transactionManager{}, + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, errorBuffer), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + retryBufferQuota: newRetryBufferQuota(config), } leader := &Broker{id: 1} parent.client = &stubLeaderClient{leader: leader, cfg: config} @@ -1682,6 +1661,52 @@ func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, return parent, retryBP } +// holdFirstRetryAfterReserve lets the test prove a second retry sees quota held by the first. +func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, afterRefresh bool) (<-chan struct{}, func()) { + firstRetryReserved := make(chan struct{}, 1) + blockFirstRetry := make(chan struct{}, 1) + releaseFirstRetry := make(chan struct{}, 1) + blockFirstRetry <- struct{}{} + + hold := func() { + select { + case <-blockFirstRetry: + firstRetryReserved <- struct{}{} + <-releaseFirstRetry + default: + } + } + release := func() { + select { + case releaseFirstRetry <- struct{}{}: + default: + } + } + + if afterRefresh { + parent.client = &stubLeaderClient{ + leader: retryBP.broker, + cfg: parent.conf, + refreshMetadata: func(...string) error { + hold() + return nil + }, + } + } else { + parent.client = &stubLeaderClient{ + cfg: parent.conf, + leaderFunc: func(_ string, partition int32) (*Broker, error) { + if partition == 0 { + hold() + } + return retryBP.broker, nil + }, + } + } + + return firstRetryReserved, release +} + // TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { @@ -1962,11 +1987,12 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: txnMgr, + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: txnMgr, + retryBufferQuota: newRetryBufferQuota(config), } leader := &Broker{} parent.client = &stubLeaderClient{leader: leader, cfg: config} @@ -1997,7 +2023,6 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { default: t.Fatal("expected retry batch to be dispatched") } - waitForProducerBuffer(t, parent, 0, 0) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2124,18 +2149,28 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: &transactionManager{}, + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + retryBufferQuota: newRetryBufferQuota(config), } failedBroker := &Broker{id: 1} retryLeader := &Broker{id: 2} - client := &blockingRefreshClient{ - stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, - started: make(chan struct{}, 1), - release: make(chan struct{}), + started := make(chan struct{}, 1) + release := make(chan struct{}) + client := &stubLeaderClient{ + leader: retryLeader, + cfg: config, + refreshMetadata: func(...string) error { + select { + case started <- struct{}{}: + default: + } + <-release + return nil + }, } parent.client = client @@ -2169,8 +2204,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T close(done) }() assertDoneWithin(t, done, 2*time.Second) - assertDoneWithin(t, client.started, 2*time.Second) - waitForProducerBuffer(t, parent, 1, -1) + assertDoneWithin(t, started, 2*time.Second) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2179,11 +2213,10 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T t.Fatal("expected partition to remain muted while async metadata refresh is pending") } - close(client.release) + close(release) retrySet := assertDoneWithin(t, output, 2*time.Second) defer parent.muter.unmute(retrySet) require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) - waitForProducerBuffer(t, parent, 0, 0) } func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { @@ -2201,9 +2234,14 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t } failedBroker := &Broker{id: 1} retryLeader := &Broker{id: 2} - client := &countingRefreshClient{ - stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, - calls: make(chan []string, 1), + calls := make(chan []string, 1) + client := &stubLeaderClient{ + leader: retryLeader, + cfg: config, + refreshMetadata: func(topics ...string) error { + calls <- append([]string(nil), topics...) + return nil + }, } parent.client = client @@ -2236,10 +2274,10 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t bp.handleError(sent, ErrOutOfBrokers) - require.Equal(t, []string{"topic"}, assertDoneWithin(t, client.calls, 2*time.Second)) + require.Equal(t, []string{"topic"}, assertDoneWithin(t, calls, 2*time.Second)) retrySet := assertDoneWithin(t, output, 2*time.Second) defer parent.muter.unmute(retrySet) - assertNotDone(t, client.calls, 50*time.Millisecond) + assertNotDone(t, calls, 50*time.Millisecond) assertNotDone(t, output, 50*time.Millisecond) retriedPartitions := make(map[int32]*partitionSet) @@ -2266,8 +2304,9 @@ func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { failedBroker := &Broker{id: 1} leaderA := &Broker{id: 2} leaderB := &Broker{id: 3} - client := &routingLeaderClient{ - stubLeaderClient: stubLeaderClient{cfg: config}, + calls := make(chan []string, 1) + client := &stubLeaderClient{ + cfg: config, leaders: map[string]map[int32]*Broker{ "topic-a": { 0: leaderA, @@ -2278,7 +2317,10 @@ func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { 1: leaderB, }, }, - calls: make(chan []string, 1), + refreshMetadata: func(topics ...string) error { + calls <- append([]string(nil), topics...) + return nil + }, } parent.client = client @@ -2318,7 +2360,7 @@ func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { bp.handleError(sent, ErrOutOfBrokers) - require.ElementsMatch(t, []string{"topic-a", "topic-b"}, assertDoneWithin(t, client.calls, 2*time.Second)) + require.ElementsMatch(t, []string{"topic-a", "topic-b"}, assertDoneWithin(t, calls, 2*time.Second)) retrySetA := assertDoneWithin(t, outputA, 2*time.Second) defer parent.muter.unmute(retrySetA) retrySetB := assertDoneWithin(t, outputB, 2*time.Second) @@ -2396,16 +2438,27 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 1), - done: make(chan struct{}), - txnmgr: &transactionManager{}, + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + retryBufferQuota: newRetryBufferQuota(config), } leader := &Broker{id: 1} - parent.client = &stubLeaderClient{leader: leader, cfg: config} + leaderLookup := make(chan struct{}, 1) + parent.client = &stubLeaderClient{ + cfg: config, + leaderFunc: func(string, int32) (*Broker, error) { + select { + case leaderLookup <- struct{}{}: + default: + } + return leader, nil + }, + } retryBP := &brokerProducer{ parent: parent, broker: leader, @@ -2428,7 +2481,7 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) close(done) }() - waitForProducerBuffer(t, parent, 1, -1) + assertDoneWithin(t, leaderLookup, 2*time.Second) if parent.closed.CompareAndSwap(false, true) { close(parent.done) @@ -2437,7 +2490,6 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) require.Equal(t, ErrShuttingDown, producerErr.Err) assertDoneWithin(t, done, 2*time.Second) - waitForProducerBuffer(t, parent, 0, 0) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2447,114 +2499,105 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { parent.muter.unmute(contender) } -func TestRetryBatchUsesSharedBufferLengthBudget(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = true - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 - - parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) - - first := newProduceSet(parent) - for i := 0; i < minFunctionalRetryBufferLength; i++ { - safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - } - firstPartitionSet := first.msgs["topic"][0] - if !parent.muter.tryMute(first) { - t.Fatal("expected first retry batch to mute partitions") - } - parent.inFlight.Add(minFunctionalRetryBufferLength) - - firstDone := make(chan struct{}) - go func() { - parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) - close(firstDone) - }() - waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) - - second := newProduceSet(parent) - safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) - secondPartitionSet := second.msgs["topic"][1] - if !parent.muter.tryMute(second) { - t.Fatal("expected second retry batch to mute partitions") - } - parent.inFlight.Add(1) - parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) - require.Equal(t, int32(1), producerErr.Msg.Partition) - waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected overflowed retry batch to release partition mute") +func TestRetryUsesSharedBufferBudget(t *testing.T) { + tests := []struct { + name string + afterRefresh bool + bytesLimit bool + }{ + {name: "retryBatch length"}, + {name: "retryBatch bytes", bytesLimit: true}, + {name: "retryBatchesAfterRefresh length", afterRefresh: true}, + {name: "retryBatchesAfterRefresh bytes", afterRefresh: true, bytesLimit: true}, } - parent.muter.unmute(contender) - close(retryBP.done) - assertDoneWithin(t, firstDone, 2*time.Second) - waitForProducerBuffer(t, parent, 0, 0) -} - -func TestRetryBatchUsesSharedBufferBytesBudget(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = true - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = !tt.afterRefresh + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + firstCount := 1 + errorBuffer := 2 + var firstMsg *ProducerMessage + secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} + + if tt.bytesLimit { + firstMsg = &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} + version := producerMessageByteSizeVersion(config) + firstBytes := int64(firstMsg.ByteSize(version)) + secondBytes := int64(secondMsg.ByteSize(version)) + config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + } else { + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 + firstCount = minFunctionalRetryBufferLength + errorBuffer = minFunctionalRetryBufferLength + 1 + } - parent, retryBP := newBlockingRetryProducer(config, 2) + parent, retryBP := newBlockingRetryProducer(config, errorBuffer) + firstRetryReserved, releaseFirstRetry := holdFirstRetryAfterReserve(parent, retryBP, tt.afterRefresh) + defer releaseFirstRetry() + + retry := func(partition int32, pSet *partitionSet) { + if tt.afterRefresh { + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: partition, + pSet: pSet, + }}, ErrOutOfBrokers) + return + } + parent.retryBatch("topic", partition, pSet, ErrNotEnoughReplicas, true) + } - firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} - secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} - version := parent.producerMessageByteSizeVersion() - firstBytes := int64(firstMsg.ByteSize(version)) - secondBytes := int64(secondMsg.ByteSize(version)) - config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + first := newProduceSet(parent) + if tt.bytesLimit { + safeAddMessage(t, first, firstMsg) + } else { + for i := 0; i < minFunctionalRetryBufferLength; i++ { + safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + } + } + firstPartitionSet := first.msgs["topic"][0] + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(firstCount) - first := newProduceSet(parent) - safeAddMessage(t, first, firstMsg) - firstPartitionSet := first.msgs["topic"][0] - if !parent.muter.tryMute(first) { - t.Fatal("expected first retry batch to mute partitions") - } - parent.inFlight.Add(1) + firstDone := make(chan struct{}) + go func() { + retry(0, firstPartitionSet) + close(firstDone) + }() + assertDoneWithin(t, firstRetryReserved, 2*time.Second) - firstDone := make(chan struct{}) - go func() { - parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) - close(firstDone) - }() - waitForProducerBuffer(t, parent, -1, firstBytes) + second := newProduceSet(parent) + safeAddMessage(t, second, secondMsg) + secondPartitionSet := second.msgs["topic"][1] + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + retry(1, secondPartitionSet) - second := newProduceSet(parent) - safeAddMessage(t, second, secondMsg) - secondPartitionSet := second.msgs["topic"][1] - if !parent.muter.tryMute(second) { - t.Fatal("expected second retry batch to mute partitions") - } - parent.inFlight.Add(1) - parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) - require.Equal(t, int32(1), producerErr.Msg.Partition) - waitForProducerBuffer(t, parent, -1, firstBytes) + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected overflowed retry batch to release partition mute") + close(retryBP.done) + releaseFirstRetry() + assertDoneWithin(t, firstDone, 2*time.Second) + }) } - parent.muter.unmute(contender) - - close(retryBP.done) - assertDoneWithin(t, firstDone, 2*time.Second) - waitForProducerBuffer(t, parent, 0, 0) } func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T) { @@ -2615,128 +2658,6 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T parent.muter.unmute(contender) } -func TestRetryBatchesAfterRefreshUsesSharedBufferLengthBudget(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 - - parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) - - first := newProduceSet(parent) - for i := 0; i < minFunctionalRetryBufferLength; i++ { - safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - } - if !parent.muter.tryMute(first) { - t.Fatal("expected first retry batch to mute partitions") - } - parent.inFlight.Add(minFunctionalRetryBufferLength) - - firstDone := make(chan struct{}) - go func() { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 0, - pSet: first.msgs["topic"][0], - }}, ErrOutOfBrokers) - close(firstDone) - }() - waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) - - second := newProduceSet(parent) - safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) - if !parent.muter.tryMute(second) { - t.Fatal("expected second retry batch to mute partitions") - } - parent.inFlight.Add(1) - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 1, - pSet: second.msgs["topic"][1], - }}, ErrOutOfBrokers) - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) - require.Equal(t, int32(1), producerErr.Msg.Partition) - waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected overflowed retry batch to release partition mute") - } - parent.muter.unmute(contender) - - close(retryBP.done) - assertDoneWithin(t, firstDone, 2*time.Second) - waitForProducerBuffer(t, parent, 0, 0) -} - -func TestRetryBatchesAfterRefreshUsesSharedBufferBytesBudget(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - - parent, retryBP := newBlockingRetryProducer(config, 2) - - firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} - secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} - version := parent.producerMessageByteSizeVersion() - firstBytes := int64(firstMsg.ByteSize(version)) - secondBytes := int64(secondMsg.ByteSize(version)) - config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes - - first := newProduceSet(parent) - safeAddMessage(t, first, firstMsg) - if !parent.muter.tryMute(first) { - t.Fatal("expected first retry batch to mute partitions") - } - parent.inFlight.Add(1) - - firstDone := make(chan struct{}) - go func() { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 0, - pSet: first.msgs["topic"][0], - }}, ErrOutOfBrokers) - close(firstDone) - }() - waitForProducerBuffer(t, parent, -1, firstBytes) - - second := newProduceSet(parent) - safeAddMessage(t, second, secondMsg) - if !parent.muter.tryMute(second) { - t.Fatal("expected second retry batch to mute partitions") - } - parent.inFlight.Add(1) - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 1, - pSet: second.msgs["topic"][1], - }}, ErrOutOfBrokers) - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) - require.Equal(t, int32(1), producerErr.Msg.Partition) - waitForProducerBuffer(t, parent, -1, firstBytes) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected overflowed retry batch to release partition mute") - } - parent.muter.unmute(contender) - - close(retryBP.done) - assertDoneWithin(t, firstDone, 2*time.Second) - waitForProducerBuffer(t, parent, 0, 0) -} - func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2752,10 +2673,7 @@ func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { errors: make(chan *ProducerError, 1), txnmgr: &transactionManager{}, } - parent.client = &failingLeaderClient{ - stubLeaderClient: stubLeaderClient{cfg: config}, - err: ErrOutOfBrokers, - } + parent.client = &stubLeaderClient{cfg: config, leaderErr: ErrOutOfBrokers} bp := &brokerProducer{ parent: parent, @@ -2786,8 +2704,12 @@ func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { } type stubLeaderClient struct { - cfg *Config - leader *Broker + cfg *Config + leader *Broker + leaderFunc func(string, int32) (*Broker, error) + leaderErr error + leaders map[string]map[int32]*Broker + refreshMetadata func(...string) error } func (c *stubLeaderClient) Config() *Config { return c.cfg } @@ -2799,16 +2721,36 @@ func (c *stubLeaderClient) Topics() ([]string, error) { return func (c *stubLeaderClient) Partitions(string) ([]int32, error) { return nil, nil } func (c *stubLeaderClient) WritablePartitions(string) ([]int32, error) { return nil, nil } func (c *stubLeaderClient) Leader(topic string, partitionID int32) (*Broker, error) { + if c.leaderFunc != nil { + return c.leaderFunc(topic, partitionID) + } + if c.leaderErr != nil { + return nil, c.leaderErr + } + if partitions := c.leaders[topic]; partitions != nil { + if leader := partitions[partitionID]; leader != nil { + return leader, nil + } + } + if c.leaders != nil { + return nil, ErrLeaderNotAvailable + } return c.leader, nil } -func (c *stubLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, error) { - return c.leader, 0, nil +func (c *stubLeaderClient) LeaderAndEpoch(topic string, partition int32) (*Broker, int32, error) { + leader, err := c.Leader(topic, partition) + return leader, 0, err +} +func (c *stubLeaderClient) Replicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) InSyncReplicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) OfflineReplicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) RefreshBrokers([]string) error { return nil } +func (c *stubLeaderClient) RefreshMetadata(topics ...string) error { + if c.refreshMetadata != nil { + return c.refreshMetadata(topics...) + } + return nil } -func (c *stubLeaderClient) Replicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) InSyncReplicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) OfflineReplicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) RefreshBrokers([]string) error { return nil } -func (c *stubLeaderClient) RefreshMetadata(...string) error { return nil } func (c *stubLeaderClient) GetOffset(string, int32, int64) (int64, error) { return 0, nil } func (c *stubLeaderClient) Coordinator(string) (*Broker, error) { return nil, nil } func (c *stubLeaderClient) RefreshCoordinator(string) error { return nil } @@ -2820,69 +2762,6 @@ func (c *stubLeaderClient) PartitionNotReadable(string, int32) bool { r func (c *stubLeaderClient) Close() error { return nil } func (c *stubLeaderClient) Closed() bool { return false } -type failingLeaderClient struct { - stubLeaderClient - err error -} - -func (c *failingLeaderClient) Leader(string, int32) (*Broker, error) { - return nil, c.err -} - -func (c *failingLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, error) { - return nil, 0, c.err -} - -type blockingRefreshClient struct { - stubLeaderClient - started chan struct{} - release chan struct{} -} - -func (c *blockingRefreshClient) RefreshMetadata(...string) error { - select { - case c.started <- struct{}{}: - default: - } - <-c.release - return nil -} - -type countingRefreshClient struct { - stubLeaderClient - calls chan []string -} - -func (c *countingRefreshClient) RefreshMetadata(topics ...string) error { - c.calls <- append([]string(nil), topics...) - return nil -} - -type routingLeaderClient struct { - stubLeaderClient - leaders map[string]map[int32]*Broker - calls chan []string -} - -func (c *routingLeaderClient) Leader(topic string, partition int32) (*Broker, error) { - if partitions := c.leaders[topic]; partitions != nil { - if leader := partitions[partition]; leader != nil { - return leader, nil - } - } - return nil, ErrLeaderNotAvailable -} - -func (c *routingLeaderClient) LeaderAndEpoch(topic string, partition int32) (*Broker, int32, error) { - leader, err := c.Leader(topic, partition) - return leader, 0, err -} - -func (c *routingLeaderClient) RefreshMetadata(topics ...string) error { - c.calls <- append([]string(nil), topics...) - return nil -} - func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor,