From 5b2605f673b0a1ab1c2a519c969aa062a26abe47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 20:55:49 +0800 Subject: [PATCH 1/4] idempotent is false still keep order by the mute --- async_producer.go | 67 ++++++++++++++++++++----- async_producer_test.go | 110 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 36 ++++++++++++++ 3 files changed, 202 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..8eb319ed3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } +func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.isMuted(topic, partition) +} + // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -370,12 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + // reusePartitionMute is a one-shot marker for non-idempotent retries. The + // first retried message from a failed, still-muted batch may reuse that + // existing in-flight reservation so later same-partition messages cannot + // slip in between the failed send and its retry. + reusePartitionMute bool + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1083,7 +1096,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { + readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() + if bp.flushingBatch == nil && readyToFlush { bp.tryBuildFlushingBatch() } @@ -1098,6 +1112,13 @@ func (bp *brokerProducer) run() { output = nil } + var unmuteCh <-chan struct{} + if bp.flushingBatch == nil && readyToFlush { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } + select { case msg, ok := <-bp.input: if !ok { @@ -1169,6 +1190,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1186,10 +1208,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial == nil { + if partial != nil { + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true + } + + reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { + partitions := bp.accumulatingBatch.msgs[topic] + if partitions == nil { + return false + } + set := partitions[partition] + return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) + }) + if reused == nil { return false } - bp.flushingBatch = partial + reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + pSet.clearPartitionMuteReuse() + }) + bp.flushingBatch = reused if bp.accumulatingBatch.empty() { bp.rollOver() } @@ -1354,7 +1395,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1397,6 +1438,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1492,13 +1534,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} + } + if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..ed5e6beeb 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } +func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { + config := NewTestConfig() + config.Producer.Flush.Messages = 1 + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + txnmgr: &transactionManager{}, + } + + blockedSet := newProduceSet(parent) + safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) + if !parent.muter.tryMute(blockedSet) { + t.Fatal("expected to mute partition externally") + } + + input := make(chan *ProducerMessage) + output := make(chan *produceSet, 1) + responses := make(chan *brokerProducerResponse) + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: input, + output: output, + responses: responses, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} + safeAddMessage(t, bp.accumulatingBatch, retryMsg) + + done := make(chan struct{}) + go func() { + bp.run() + close(done) + }() + + assertNotDone(t, output, 50*time.Millisecond) + parent.muter.unmute(blockedSet) + + flushed := assertDoneWithin(t, output, 2*time.Second) + require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) + + close(input) + close(responses) + assertDoneWithin(t, done, 2*time.Second) +} + // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + txnMgr := &transactionManager{ + producerID: 0, + producerEpoch: 0, + sequenceNumbers: make(map[string]int32), + } + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + txnmgr: txnMgr, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + + bp := &brokerProducer{ + parent: parent, + broker: leader, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + parent.brokers[leader] = bp + parent.brokerRefs[bp] = 1 + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + defer parent.muter.unmute(sent) + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, retryPartitionSet.msgs[0], retried) + require.True(t, retried.reusePartitionMute) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } + + safeAddMessage(t, bp.accumulatingBatch, retried) + if !bp.tryBuildFlushingBatch() { + t.Fatal("expected retry batch to reuse the existing partition mute") + } + require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) + require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) +} + type stubLeaderClient struct { cfg *Config leader *Broker diff --git a/produce_set.go b/produce_set.go index 380f7f5ab..a86235b9e 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,6 +12,42 @@ type partitionSet struct { bufferBytes int } +// markPartitionMuteReusable lets the first message carry the retry batch's +// right to reuse the partition mute that is still held by its failed send. +// Only the first message is marked because one held mute represents one +// in-flight retry permission; marking every message could let split retry +// fragments bypass each other under the same reservation. +func (ps *partitionSet) markPartitionMuteReusable() { + if len(ps.msgs) > 0 { + ps.msgs[0].reusePartitionMute = true + } +} + +// canReusePartitionMute reports whether this retry batch may be flushed while +// its partition is already muted by the previous send attempt. +func (ps *partitionSet) canReusePartitionMute() bool { + return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute +} + +// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry +// batch is selected for flushing. +func (ps *partitionSet) clearPartitionMuteReuse() { + for _, msg := range ps.msgs { + msg.reusePartitionMute = false + } +} + +// canRetry prevents holding a partition mute when every message in the set will +// be returned as an error instead of being retried. +func (ps *partitionSet) canRetry(maxRetries int) bool { + for _, msg := range ps.msgs { + if msg.retries < maxRetries { + return true + } + } + return false +} + type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet From 3deed1fec5e7503804f3905e29bd386f8f152871 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 15:14:05 +0800 Subject: [PATCH 2/4] respect the buffer max bytes and buffer max message Signed-off-by: 3AceShowHand --- async_producer.go | 333 ++++++++++++++++++++++++---- async_producer_test.go | 478 ++++++++++++++++++++++++++++++++++++++++- config.go | 16 +- produce_set.go | 9 + 4 files changed, 777 insertions(+), 59 deletions(-) diff --git a/async_producer.go b/async_producer.go index 29bc3a114..9ce179114 100644 --- a/async_producer.go +++ b/async_producer.go @@ -14,7 +14,7 @@ import ( "github.com/rcrowley/go-metrics" ) -// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +// ErrProducerRetryBufferOverflow is returned when producer retry buffering is full and OOM prevention needs to be applied. var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") const ( @@ -102,6 +102,8 @@ type asyncProducer struct { brokerRefs map[*brokerProducer]int brokerLock sync.Mutex + retryBuffer retryBufferQuota + txnmgr *transactionManager txLock sync.Mutex @@ -112,6 +114,14 @@ type asyncProducer struct { metricsRegistry metrics.Registry } +type retryBufferQuota struct { + // messages and bytes track producer-level retry buffer occupancy when + // Producer.Retry.MaxBufferLength or MaxBufferBytes is bounded. + messages int64 + bytes int64 + mu sync.Mutex +} + type partitionMuter struct { mu sync.Mutex cond *sync.Cond @@ -791,6 +801,13 @@ func (pp *partitionProducer) backoff(retries int) { } func (p *asyncProducer) backoff(retries int) { + backoff := p.retryBackoff(retries) + if backoff > 0 { + time.Sleep(backoff) + } +} + +func (p *asyncProducer) retryBackoff(retries int) time.Duration { var backoff time.Duration if p.conf.Producer.Retry.BackoffFunc != nil { maxRetries := p.conf.Producer.Retry.Max @@ -798,8 +815,37 @@ func (p *asyncProducer) backoff(retries int) { } else { backoff = p.conf.Producer.Retry.Backoff } - if backoff > 0 { - time.Sleep(backoff) + return backoff +} + +func (p *asyncProducer) shuttingDown() bool { + if p.shutdownCh == nil { + return false + } + select { + case <-p.shutdownCh: + return true + default: + return false + } +} + +func (p *asyncProducer) backoffOrDone(retries int) bool { + backoff := p.retryBackoff(retries) + if backoff <= 0 { + return !p.shuttingDown() + } + timer := time.NewTimer(backoff) + defer timer.Stop() + if p.shutdownCh == nil { + <-timer.C + return true + } + select { + case <-timer.C: + return true + case <-p.shutdownCh: + return false } } @@ -1448,12 +1494,18 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, retryErr error, alreadyMuted bool) { Logger.Printf("Retrying batch for %v-%d because of %v\n", topic, partition, retryErr) produceSet := newProduceSet(p) - produceSet.msgs[topic] = make(map[int32]*partitionSet) - produceSet.msgs[topic][partition] = pSet - produceSet.bufferBytes += pSet.bufferBytes - produceSet.bufferCount += len(pSet.msgs) + produceSet.addPartitionSet(topic, partition, pSet) + bufferMessages, bufferBytes := p.produceSetBufferSize(produceSet) + bufferReserved := false + releaseBuffer := func() { + if bufferReserved { + p.releaseBuffer(bufferMessages, bufferBytes) + bufferReserved = false + } + } muted := alreadyMuted failBatch := func(err error) { + releaseBuffer() // Release the partition before reporting errors so a blocked Errors // consumer cannot keep later same-partition messages muted. if muted { @@ -1471,12 +1523,17 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } msg.retries++ } + if !p.reserveBuffer(bufferMessages, bufferBytes) { + failBatch(ErrProducerRetryBufferOverflow) + return + } + bufferReserved = p.retryBufferBounded() && (bufferMessages != 0 || bufferBytes != 0) - // honor Producer.Retry.Backoff between retry attempts (#2469); the - // non-idempotent path gets this from partitionProducer.dispatch, but - // retryBatch dispatches the produceSet directly to the broker - if len(pSet.msgs) > 0 { - p.backoff(pSet.msgs[0].retries) + // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch + // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. + if len(pSet.msgs) > 0 && !p.backoffOrDone(pSet.msgs[0].retries) { + failBatch(ErrShuttingDown) + return } // it's expected that a metadata refresh has been requested prior to calling retryBatch @@ -1499,6 +1556,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio failBatch(ErrShuttingDown) return } + releaseBuffer() } type partitionBatchRetry struct { @@ -1507,30 +1565,233 @@ type partitionBatchRetry struct { pSet *partitionSet } +func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) { + produceSet := newProduceSet(p) + produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet) + p.muter.unmute(produceSet) + for _, msg := range batch.pSet.msgs { + p.returnError(msg, err) + } +} + +func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) { + for _, batch := range batches { + p.failMutedBatch(batch, err) + } +} + +func (p *asyncProducer) retryBufferLimits() (int64, int64) { + maxBufferLength := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { + maxBufferLength = minFunctionalRetryBufferLength + } + + maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes + if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { + maxBufferBytes = minFunctionalRetryBufferBytes + } + + return int64(maxBufferLength), maxBufferBytes +} + +func (p *asyncProducer) retryBufferBounded() bool { + maxBufferLength, maxBufferBytes := p.retryBufferLimits() + return maxBufferLength > 0 || maxBufferBytes > 0 +} + +func (p *asyncProducer) producerMessageByteSizeVersion() int { + if p.conf.Version.IsAtLeast(V0_11_0_0) { + return 2 + } + return 1 +} + +func (p *asyncProducer) batchBufferSize(batch partitionBatchRetry) (int64, int64) { + return int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) +} + +func (p *asyncProducer) batchesBufferSize(batches []partitionBatchRetry) (int64, int64) { + var messages, bytes int64 + for _, batch := range batches { + batchMessages, batchBytes := p.batchBufferSize(batch) + messages += batchMessages + bytes += batchBytes + } + return messages, bytes +} + +func (p *asyncProducer) produceSetBufferSize(set *produceSet) (int64, int64) { + var messages, bytes int64 + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + messages += int64(len(pSet.msgs)) + bytes += int64(pSet.bufferBytes) + }) + return messages, bytes +} + +func (p *asyncProducer) bufferWouldOverflow(messages, bytes int64) bool { + maxBufferLength, maxBufferBytes := p.retryBufferLimits() + return (maxBufferLength > 0 && messages >= maxBufferLength) || + (maxBufferBytes > 0 && bytes >= maxBufferBytes) +} + +func (p *asyncProducer) reserveBuffer(messages, bytes int64) bool { + if messages == 0 && bytes == 0 { + return true + } + if !p.retryBufferBounded() { + return true + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + if p.bufferWouldOverflow(p.retryBuffer.messages+messages, p.retryBuffer.bytes+bytes) { + return false + } + p.retryBuffer.messages += messages + p.retryBuffer.bytes += bytes + return true +} + +func (p *asyncProducer) addToBuffer(messages, bytes int64) bool { + if messages == 0 && bytes == 0 { + return false + } + if !p.retryBufferBounded() { + return false + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + p.retryBuffer.messages += messages + p.retryBuffer.bytes += bytes + return p.bufferWouldOverflow(p.retryBuffer.messages, p.retryBuffer.bytes) +} + +func (p *asyncProducer) releaseBuffer(messages, bytes int64) { + if messages == 0 && bytes == 0 { + return + } + if !p.retryBufferBounded() { + return + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + p.retryBuffer.messages -= messages + p.retryBuffer.bytes -= bytes +} + // retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. // Connection errors already hold partition mutes; doing the refresh in the // response loop can block all progress for that broker while the cluster is // unstable. Refresh once for the failed request so multi-partition batches do -// not amplify controller-fail metadata traffic, then retry partitions -// independently so one blocked broker handoff cannot hold up the rest. +// not amplify controller-fail metadata traffic, then group retry partitions by +// their refreshed leader broker to keep handoff fanout bounded by broker count. func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { - if p.shutdownCh != nil { - select { - case <-p.shutdownCh: - for _, batch := range batches { - go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + if p.shuttingDown() { + p.failMutedBatches(batches, ErrShuttingDown) + return + } + + var retryable []partitionBatchRetry + maxRetryAttempt := 0 + for _, batch := range batches { + shouldRetry := true + for _, msg := range batch.pSet.msgs { + if msg.retries >= p.conf.Producer.Retry.Max { + shouldRetry = false + break + } + } + if !shouldRetry { + p.failMutedBatch(batch, retryErr) + continue + } + for _, msg := range batch.pSet.msgs { + msg.retries++ + if msg.retries > maxRetryAttempt { + maxRetryAttempt = msg.retries } - return - default: } + retryable = append(retryable, batch) + } + if len(retryable) == 0 { + return + } + + // Reserve before metadata refresh so refresh-blocked direct retries still + // count against the producer-level retry buffer budget. + bufferMessages, bufferBytes := p.batchesBufferSize(retryable) + if !p.reserveBuffer(bufferMessages, bufferBytes) { + p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) + return + } + + if p.shuttingDown() { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return } if len(topics) > 0 { if err := p.client.RefreshMetadata(topics...); err != nil { Logger.Printf("Failed refreshing metadata because of %v\n", err) } } - for _, batch := range batches { - go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + if p.shuttingDown() { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return + } + + if !p.backoffOrDone(maxRetryAttempt) { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return + } + + brokerSets := make(map[*Broker]*produceSet) + for _, batch := range retryable { + if p.shuttingDown() { + batchMessages, batchBytes := p.batchBufferSize(batch) + p.releaseBuffer(batchMessages, batchBytes) + p.failMutedBatch(batch, ErrShuttingDown) + continue + } + leader, leaderErr := p.client.Leader(batch.topic, batch.partition) + if leaderErr != nil { + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, leaderErr) + batchMessages, batchBytes := p.batchBufferSize(batch) + p.releaseBuffer(batchMessages, batchBytes) + p.failMutedBatch(batch, retryErr) + continue + } + set := brokerSets[leader] + if set == nil { + set = newProduceSet(p) + brokerSets[leader] = set + } + set.addPartitionSet(batch.topic, batch.partition, batch.pSet) + } + + for leader, set := range brokerSets { + go p.handoffRetrySet(leader, set) + } +} + +func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { + bp := p.getBrokerProducer(leader) + accepted := p.sendRetryBatch(bp, set) + p.unrefBrokerProducer(leader, bp) + setMessages, setBytes := p.produceSetBufferSize(set) + p.releaseBuffer(setMessages, setBytes) + if !accepted { + p.muter.unmute(set) + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + for _, msg := range pSet.msgs { + p.returnError(msg, ErrShuttingDown) + } + }) } } @@ -1633,22 +1894,8 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - maxBufferLength := p.conf.Producer.Retry.MaxBufferLength - if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { - maxBufferLength = minFunctionalRetryBufferLength - } - - maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes - if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { - maxBufferBytes = minFunctionalRetryBufferBytes - } - - version := 1 - if p.conf.Version.IsAtLeast(V0_11_0_0) { - version = 2 - } + version := p.producerMessageByteSizeVersion() - var currentByteSize int64 var msg *ProducerMessage buf := queue.New() @@ -1660,7 +1907,7 @@ func (p *asyncProducer) retryHandler() { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): msgToRemove := buf.Remove().(*ProducerMessage) - currentByteSize -= int64(msgToRemove.ByteSize(version)) + p.releaseBuffer(1, int64(msgToRemove.ByteSize(version))) continue } } @@ -1670,9 +1917,9 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) - currentByteSize += int64(msg.ByteSize(version)) + bufferOverflow := p.addToBuffer(1, int64(msg.ByteSize(version))) - if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) { + if !bufferOverflow { continue } @@ -1681,10 +1928,10 @@ func (p *asyncProducer) retryHandler() { select { case p.input <- msgToHandle: buf.Remove() - currentByteSize -= int64(msgToHandle.ByteSize(version)) + p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) default: buf.Remove() - currentByteSize -= int64(msgToHandle.ByteSize(version)) + p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) } } diff --git a/async_producer_test.go b/async_producer_test.go index 4bb9cd87a..fd021c655 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1635,6 +1635,51 @@ func assertDoneWithin[T any](t *testing.T, ch <-chan T, timeout time.Duration) T } } +func waitForProducerBuffer(t *testing.T, parent *asyncProducer, messages, bytes int64) { + t.Helper() + deadline := time.After(2 * time.Second) + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + for { + parent.retryBuffer.mu.Lock() + gotMessages := parent.retryBuffer.messages + gotBytes := parent.retryBuffer.bytes + parent.retryBuffer.mu.Unlock() + if (messages < 0 || gotMessages == messages) && (bytes < 0 || gotBytes == bytes) { + return + } + select { + case <-deadline: + t.Fatalf("timed out waiting for buffer budget messages=%d bytes=%d, got messages=%d bytes=%d", + messages, bytes, gotMessages, gotBytes) + case <-ticker.C: + } + } +} + +func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, *brokerProducer) { + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, errorBuffer), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + return parent, retryBP +} + // TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { @@ -1907,6 +1952,7 @@ func TestBrokerProducerRollOverClearsTimer(t *testing.T) { func TestRetryBatchRespectsPartitionMuter(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength txnMgr := &transactionManager{ producerID: 0, producerEpoch: 0, @@ -1949,6 +1995,7 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { default: t.Fatal("expected retry batch to be dispatched") } + waitForProducerBuffer(t, parent, 0, 0) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2023,6 +2070,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ conf: config, @@ -2071,6 +2119,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T }() assertDoneWithin(t, done, 2*time.Second) assertDoneWithin(t, client.started, 2*time.Second) + waitForProducerBuffer(t, parent, 1, -1) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2083,6 +2132,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T retrySet := assertDoneWithin(t, output, 2*time.Second) defer parent.muter.unmute(retrySet) require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + waitForProducerBuffer(t, parent, 0, 0) } func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { @@ -2136,23 +2186,112 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t bp.handleError(sent, ErrOutOfBrokers) require.Equal(t, []string{"topic"}, assertDoneWithin(t, client.calls, 2*time.Second)) - retrySet0 := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet0) - retrySet1 := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet1) + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) assertNotDone(t, client.calls, 50*time.Millisecond) + assertNotDone(t, output, 50*time.Millisecond) retriedPartitions := make(map[int32]*partitionSet) - retrySet0.eachPartition(func(_ string, partition int32, pSet *partitionSet) { - retriedPartitions[partition] = pSet - }) - retrySet1.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retrySet.eachPartition(func(_ string, partition int32, pSet *partitionSet) { retriedPartitions[partition] = pSet }) require.Same(t, retryPartitionSet0, retriedPartitions[int32(0)]) require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) } +func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + leaderA := &Broker{id: 2} + leaderB := &Broker{id: 3} + client := &routingLeaderClient{ + stubLeaderClient: stubLeaderClient{cfg: config}, + leaders: map[string]map[int32]*Broker{ + "topic-a": { + 0: leaderA, + 1: leaderA, + }, + "topic-b": { + 0: leaderB, + 1: leaderB, + }, + }, + calls: make(chan []string, 1), + } + parent.client = client + + outputA := make(chan *produceSet, 1) + outputB := make(chan *produceSet, 1) + retryBPA := &brokerProducer{ + parent: parent, + broker: leaderA, + output: outputA, + input: make(chan *ProducerMessage), + } + retryBPB := &brokerProducer{ + parent: parent, + broker: leaderB, + output: outputB, + input: make(chan *ProducerMessage), + } + parent.brokers[leaderA] = retryBPA + parent.brokers[leaderB] = retryBPB + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 0, Value: StringEncoder("a-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 1, Value: StringEncoder("a-1")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 0, Value: StringEncoder("b-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 1, Value: StringEncoder("b-1")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + bp.handleError(sent, ErrOutOfBrokers) + + require.ElementsMatch(t, []string{"topic-a", "topic-b"}, assertDoneWithin(t, client.calls, 2*time.Second)) + retrySetA := assertDoneWithin(t, outputA, 2*time.Second) + defer parent.muter.unmute(retrySetA) + retrySetB := assertDoneWithin(t, outputB, 2*time.Second) + defer parent.muter.unmute(retrySetB) + + require.Len(t, retrySetA.msgs, 1) + require.Len(t, retrySetA.msgs["topic-a"], 2) + require.Len(t, retrySetB.msgs, 1) + require.Len(t, retrySetB.msgs["topic-b"], 2) + for _, partitions := range retrySetA.msgs { + for _, pSet := range partitions { + require.Equal(t, 1, pSet.msgs[0].retries) + } + } + for _, partitions := range retrySetB.msgs { + for _, pSet := range partitions { + require.Equal(t, 1, pSet.msgs[0].retries) + } + } + + assertNotDone(t, outputA, 50*time.Millisecond) + assertNotDone(t, outputB, 50*time.Millisecond) +} + func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2203,6 +2342,7 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ conf: config, @@ -2231,16 +2371,187 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { t.Fatal("expected sent batch to mute partitions") } parent.inFlight.Add(1) + + done := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + close(done) + }() + waitForProducerBuffer(t, parent, 1, -1) + if parent.shutdownChClosed.CompareAndSwap(false, true) { close(parent.shutdownCh) } + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, producerErr.Err) + assertDoneWithin(t, done, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after aborted retry handoff") + } + parent.muter.unmute(contender) +} + +func TestRetryBatchUsesSharedBufferLengthBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = true + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 + + parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) + + first := newProduceSet(parent) + for i := 0; i < minFunctionalRetryBufferLength; i++ { + safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + } + firstPartitionSet := first.msgs["topic"][0] + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(minFunctionalRetryBufferLength) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) + close(firstDone) + }() + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + second := newProduceSet(parent) + safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) + secondPartitionSet := second.msgs["topic"][1] + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchUsesSharedBufferBytesBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = true + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent, retryBP := newBlockingRetryProducer(config, 2) + + firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} + secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} + version := parent.producerMessageByteSizeVersion() + firstBytes := int64(firstMsg.ByteSize(version)) + secondBytes := int64(secondMsg.ByteSize(version)) + config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + + first := newProduceSet(parent) + safeAddMessage(t, first, firstMsg) + firstPartitionSet := first.msgs["topic"][0] + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(1) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) + close(firstDone) + }() + waitForProducerBuffer(t, parent, -1, firstBytes) + + second := newProduceSet(parent) + safeAddMessage(t, second, secondMsg) + secondPartitionSet := second.msgs["topic"][1] + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, -1, firstBytes) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + done := make(chan struct{}) go func() { - parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: retryPartitionSet, + }}, ErrOutOfBrokers) close(done) }() + close(retryBP.done) producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) require.Equal(t, ErrShuttingDown, producerErr.Err) assertDoneWithin(t, done, 2*time.Second) @@ -2248,9 +2559,131 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after aborted retry handoff") + t.Fatal("expected partition mute to be released after brokerProducer done") + } + parent.muter.unmute(contender) +} + +func TestRetryBatchesAfterRefreshUsesSharedBufferLengthBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 + + parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) + + first := newProduceSet(parent) + for i := 0; i < minFunctionalRetryBufferLength; i++ { + safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + } + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(minFunctionalRetryBufferLength) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: first.msgs["topic"][0], + }}, ErrOutOfBrokers) + close(firstDone) + }() + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + second := newProduceSet(parent) + safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 1, + pSet: second.msgs["topic"][1], + }}, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") } parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchesAfterRefreshUsesSharedBufferBytesBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent, retryBP := newBlockingRetryProducer(config, 2) + + firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} + secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} + version := parent.producerMessageByteSizeVersion() + firstBytes := int64(firstMsg.ByteSize(version)) + secondBytes := int64(secondMsg.ByteSize(version)) + config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + + first := newProduceSet(parent) + safeAddMessage(t, first, firstMsg) + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(1) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: first.msgs["topic"][0], + }}, ErrOutOfBrokers) + close(firstDone) + }() + waitForProducerBuffer(t, parent, -1, firstBytes) + + second := newProduceSet(parent) + safeAddMessage(t, second, secondMsg) + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 1, + pSet: second.msgs["topic"][1], + }}, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, -1, firstBytes) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) } func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { @@ -2374,6 +2807,31 @@ func (c *countingRefreshClient) RefreshMetadata(topics ...string) error { return nil } +type routingLeaderClient struct { + stubLeaderClient + leaders map[string]map[int32]*Broker + calls chan []string +} + +func (c *routingLeaderClient) Leader(topic string, partition int32) (*Broker, error) { + if partitions := c.leaders[topic]; partitions != nil { + if leader := partitions[partition]; leader != nil { + return leader, nil + } + } + return nil, ErrLeaderNotAvailable +} + +func (c *routingLeaderClient) LeaderAndEpoch(topic string, partition int32) (*Broker, int32, error) { + leader, err := c.Leader(topic, partition) + return leader, 0, err +} + +func (c *routingLeaderClient) RefreshMetadata(topics ...string) error { + c.calls <- append([]string(nil), topics...) + return nil +} + func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/config.go b/config.go index 8b87a6cd2..9e837bbc8 100644 --- a/config.go +++ b/config.go @@ -276,16 +276,20 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration - // The maximum length of the bridging buffer between `input` and `retries` channels - // in AsyncProducer#retryHandler. - // The limit is to prevent this buffer from overflowing or causing OOM. + // The maximum number of messages in producer retry buffering, including + // the bridging buffer between `input` and `retries` channels in + // AsyncProducer#retryHandler and direct retries waiting for metadata + // refresh, retry backoff, leader lookup, partition mute, or broker handoff. + // The limit is to prevent retry buffering from overflowing or causing OOM. // Defaults to 0 for unlimited. // Any value between 0 and 4096 is pushed to 4096. // A zero or negative value indicates unlimited. MaxBufferLength int - // The maximum total byte size of messages in the bridging buffer between `input` - // and `retries` channels in AsyncProducer#retryHandler. - // This limit prevents the buffer from consuming excessive memory. + // The maximum total byte size of messages in producer retry buffering, + // including the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler and direct retries waiting for metadata + // refresh, retry backoff, leader lookup, partition mute, or broker handoff. + // This limit prevents retry buffering from consuming excessive memory. // Defaults to 0 for unlimited. // Any value between 0 and 32 MB is pushed to 32 MB. // A zero or negative value indicates unlimited. diff --git a/produce_set.go b/produce_set.go index 121a272ac..0fbb7097a 100644 --- a/produce_set.go +++ b/produce_set.go @@ -50,6 +50,15 @@ func newProduceSetWithMeta(parent *asyncProducer, producerID int64, producerEpoc } } +func (ps *produceSet) addPartitionSet(topic string, partition int32, set *partitionSet) { + if ps.msgs[topic] == nil { + ps.msgs[topic] = make(map[int32]*partitionSet) + } + ps.msgs[topic][partition] = set + ps.bufferBytes += set.bufferBytes + ps.bufferCount += len(set.msgs) +} + func (ps *produceSet) add(msg *ProducerMessage) error { var err error var key, val []byte From 560f65c17dc5bc4eb6fb5709744ca1d03b31d924 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 16:49:48 +0800 Subject: [PATCH 3/4] add more comments and simplify code Signed-off-by: 3AceShowHand --- async_producer.go | 58 ++++++++++++++---------------------------- async_producer_test.go | 49 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 39 deletions(-) diff --git a/async_producer.go b/async_producer.go index 9ce179114..d6f02bd45 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1495,7 +1495,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio Logger.Printf("Retrying batch for %v-%d because of %v\n", topic, partition, retryErr) produceSet := newProduceSet(p) produceSet.addPartitionSet(topic, partition, pSet) - bufferMessages, bufferBytes := p.produceSetBufferSize(produceSet) + bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) bufferReserved := false releaseBuffer := func() { if bufferReserved { @@ -1606,29 +1606,6 @@ func (p *asyncProducer) producerMessageByteSizeVersion() int { return 1 } -func (p *asyncProducer) batchBufferSize(batch partitionBatchRetry) (int64, int64) { - return int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) -} - -func (p *asyncProducer) batchesBufferSize(batches []partitionBatchRetry) (int64, int64) { - var messages, bytes int64 - for _, batch := range batches { - batchMessages, batchBytes := p.batchBufferSize(batch) - messages += batchMessages - bytes += batchBytes - } - return messages, bytes -} - -func (p *asyncProducer) produceSetBufferSize(set *produceSet) (int64, int64) { - var messages, bytes int64 - set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - messages += int64(len(pSet.msgs)) - bytes += int64(pSet.bufferBytes) - }) - return messages, bytes -} - func (p *asyncProducer) bufferWouldOverflow(messages, bytes int64) bool { maxBufferLength, maxBufferBytes := p.retryBufferLimits() return (maxBufferLength > 0 && messages >= maxBufferLength) || @@ -1686,8 +1663,14 @@ func (p *asyncProducer) releaseBuffer(messages, bytes int64) { // Connection errors already hold partition mutes; doing the refresh in the // response loop can block all progress for that broker while the cluster is // unstable. Refresh once for the failed request so multi-partition batches do -// not amplify controller-fail metadata traffic, then group retry partitions by -// their refreshed leader broker to keep handoff fanout bounded by broker count. +// not amplify controller-fail metadata traffic. Group retry partitions by their +// refreshed leader broker, but hand them off from this retry worker so large +// clusters do not create one temporary goroutine per target broker. +// +// This is intentionally not implemented as a loop over retryBatch. retryBatch +// is the single-partition direct retry primitive; this path owns all muted +// partitions from one failed ProduceRequest and must retry, fail, reserve +// buffer, back off, and refresh metadata at that failed-request boundary. func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { if p.shuttingDown() { p.failMutedBatches(batches, ErrShuttingDown) @@ -1697,14 +1680,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part var retryable []partitionBatchRetry maxRetryAttempt := 0 for _, batch := range batches { - shouldRetry := true - for _, msg := range batch.pSet.msgs { - if msg.retries >= p.conf.Producer.Retry.Max { - shouldRetry = false - break - } - } - if !shouldRetry { + if !batch.pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { p.failMutedBatch(batch, retryErr) continue } @@ -1722,7 +1698,11 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part // Reserve before metadata refresh so refresh-blocked direct retries still // count against the producer-level retry buffer budget. - bufferMessages, bufferBytes := p.batchesBufferSize(retryable) + var bufferMessages, bufferBytes int64 + for _, batch := range retryable { + bufferMessages += int64(len(batch.pSet.msgs)) + bufferBytes += int64(batch.pSet.bufferBytes) + } if !p.reserveBuffer(bufferMessages, bufferBytes) { p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) return @@ -1753,7 +1733,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part brokerSets := make(map[*Broker]*produceSet) for _, batch := range retryable { if p.shuttingDown() { - batchMessages, batchBytes := p.batchBufferSize(batch) + batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) p.releaseBuffer(batchMessages, batchBytes) p.failMutedBatch(batch, ErrShuttingDown) continue @@ -1761,7 +1741,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part leader, leaderErr := p.client.Leader(batch.topic, batch.partition) if leaderErr != nil { Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, leaderErr) - batchMessages, batchBytes := p.batchBufferSize(batch) + batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) p.releaseBuffer(batchMessages, batchBytes) p.failMutedBatch(batch, retryErr) continue @@ -1775,7 +1755,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part } for leader, set := range brokerSets { - go p.handoffRetrySet(leader, set) + p.handoffRetrySet(leader, set) } } @@ -1783,7 +1763,7 @@ func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { bp := p.getBrokerProducer(leader) accepted := p.sendRetryBatch(bp, set) p.unrefBrokerProducer(leader, bp) - setMessages, setBytes := p.produceSetBufferSize(set) + setMessages, setBytes := int64(set.bufferCount), int64(set.bufferBytes) p.releaseBuffer(setMessages, setBytes) if !accepted { p.muter.unmute(set) diff --git a/async_producer_test.go b/async_producer_test.go index fd021c655..a3c220f63 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2065,6 +2065,55 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } +func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + errors: make(chan *ProducerError, 2), + retries: make(chan *ProducerMessage, 2), + txnmgr: &transactionManager{}, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + exhausted := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("exhausted"), retries: 1} + retryable := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retryable")} + sent := newProduceSet(parent) + safeAddMessage(t, sent, exhausted) + safeAddMessage(t, sent, retryable) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(2) + + bp.handleError(sent, ErrOutOfBrokers) + + firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) + secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, exhausted, firstErr.Msg) + require.Equal(t, ErrOutOfBrokers, firstErr.Err) + require.Equal(t, retryable, secondErr.Msg) + require.Equal(t, ErrOutOfBrokers, secondErr.Err) + assertNotDone(t, parent.retries, 50*time.Millisecond) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after whole-batch failure") + } + parent.muter.unmute(contender) +} + func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false From ecb335e9fefb97c916d53b30acb53026058cbbfd Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 17:40:10 +0800 Subject: [PATCH 4/4] fix flaky test --- functional_producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/functional_producer_test.go b/functional_producer_test.go index df8a5d0ca..bbb0e27dc 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -430,8 +430,8 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { handler.started.Add(4) go func() { - err = cg.Consume(ctx, []string{"test.4"}, handler) - require.NoError(t, err) + consumeErr := cg.Consume(ctx, []string{"test.4"}, handler) + require.NoError(t, consumeErr) }() handler.started.Wait()