From 5b2605f673b0a1ab1c2a519c969aa062a26abe47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 20:55:49 +0800 Subject: [PATCH 1/5] idempotent is false still keep order by the mute --- async_producer.go | 67 ++++++++++++++++++++----- async_producer_test.go | 110 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 36 ++++++++++++++ 3 files changed, 202 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..8eb319ed3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } +func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.isMuted(topic, partition) +} + // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -370,12 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + // reusePartitionMute is a one-shot marker for non-idempotent retries. The + // first retried message from a failed, still-muted batch may reuse that + // existing in-flight reservation so later same-partition messages cannot + // slip in between the failed send and its retry. + reusePartitionMute bool + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1083,7 +1096,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { + readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() + if bp.flushingBatch == nil && readyToFlush { bp.tryBuildFlushingBatch() } @@ -1098,6 +1112,13 @@ func (bp *brokerProducer) run() { output = nil } + var unmuteCh <-chan struct{} + if bp.flushingBatch == nil && readyToFlush { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } + select { case msg, ok := <-bp.input: if !ok { @@ -1169,6 +1190,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1186,10 +1208,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial == nil { + if partial != nil { + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true + } + + reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { + partitions := bp.accumulatingBatch.msgs[topic] + if partitions == nil { + return false + } + set := partitions[partition] + return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) + }) + if reused == nil { return false } - bp.flushingBatch = partial + reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + pSet.clearPartitionMuteReuse() + }) + bp.flushingBatch = reused if bp.accumulatingBatch.empty() { bp.rollOver() } @@ -1354,7 +1395,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1397,6 +1438,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1492,13 +1534,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} + } + if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..ed5e6beeb 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } +func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { + config := NewTestConfig() + config.Producer.Flush.Messages = 1 + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + txnmgr: &transactionManager{}, + } + + blockedSet := newProduceSet(parent) + safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) + if !parent.muter.tryMute(blockedSet) { + t.Fatal("expected to mute partition externally") + } + + input := make(chan *ProducerMessage) + output := make(chan *produceSet, 1) + responses := make(chan *brokerProducerResponse) + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: input, + output: output, + responses: responses, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} + safeAddMessage(t, bp.accumulatingBatch, retryMsg) + + done := make(chan struct{}) + go func() { + bp.run() + close(done) + }() + + assertNotDone(t, output, 50*time.Millisecond) + parent.muter.unmute(blockedSet) + + flushed := assertDoneWithin(t, output, 2*time.Second) + require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) + + close(input) + close(responses) + assertDoneWithin(t, done, 2*time.Second) +} + // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + txnMgr := &transactionManager{ + producerID: 0, + producerEpoch: 0, + sequenceNumbers: make(map[string]int32), + } + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + txnmgr: txnMgr, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + + bp := &brokerProducer{ + parent: parent, + broker: leader, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + parent.brokers[leader] = bp + parent.brokerRefs[bp] = 1 + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + defer parent.muter.unmute(sent) + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, retryPartitionSet.msgs[0], retried) + require.True(t, retried.reusePartitionMute) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } + + safeAddMessage(t, bp.accumulatingBatch, retried) + if !bp.tryBuildFlushingBatch() { + t.Fatal("expected retry batch to reuse the existing partition mute") + } + require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) + require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) +} + type stubLeaderClient struct { cfg *Config leader *Broker diff --git a/produce_set.go b/produce_set.go index 380f7f5ab..a86235b9e 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,6 +12,42 @@ type partitionSet struct { bufferBytes int } +// markPartitionMuteReusable lets the first message carry the retry batch's +// right to reuse the partition mute that is still held by its failed send. +// Only the first message is marked because one held mute represents one +// in-flight retry permission; marking every message could let split retry +// fragments bypass each other under the same reservation. +func (ps *partitionSet) markPartitionMuteReusable() { + if len(ps.msgs) > 0 { + ps.msgs[0].reusePartitionMute = true + } +} + +// canReusePartitionMute reports whether this retry batch may be flushed while +// its partition is already muted by the previous send attempt. +func (ps *partitionSet) canReusePartitionMute() bool { + return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute +} + +// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry +// batch is selected for flushing. +func (ps *partitionSet) clearPartitionMuteReuse() { + for _, msg := range ps.msgs { + msg.reusePartitionMute = false + } +} + +// canRetry prevents holding a partition mute when every message in the set will +// be returned as an error instead of being retried. +func (ps *partitionSet) canRetry(maxRetries int) bool { + for _, msg := range ps.msgs { + if msg.retries < maxRetries { + return true + } + } + return false +} + type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet From 52b05d7f41ccde047efa2e253fbc72d0f1dd6f99 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 00:20:22 +0800 Subject: [PATCH 2/5] producer: keep non-idempotent connection retries muted Signed-off-by: 3AceShowHand --- async_producer.go | 60 +++++++---------------- async_producer_test.go | 109 +++++++++++++++++++++++++++++++---------- produce_set.go | 25 ---------- 3 files changed, 101 insertions(+), 93 deletions(-) diff --git a/async_producer.go b/async_producer.go index 8eb319ed3..226485fcc 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,13 +184,6 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } -func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { - m.mu.Lock() - defer m.mu.Unlock() - - return m.isMuted(topic, partition) -} - // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -377,17 +370,12 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - // reusePartitionMute is a one-shot marker for non-idempotent retries. The - // first retried message from a failed, still-muted batch may reuse that - // existing in-flight reservation so later same-partition messages cannot - // slip in between the failed send and its retry. - reusePartitionMute bool - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -414,7 +402,6 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 - m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1216,25 +1203,7 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { return true } - reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { - partitions := bp.accumulatingBatch.msgs[topic] - if partitions == nil { - return false - } - set := partitions[partition] - return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) - }) - if reused == nil { - return false - } - reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - pSet.clearPartitionMuteReuse() - }) - bp.flushingBatch = reused - if bp.accumulatingBatch.empty() { - bp.rollOver() - } - return true + return false } func (bp *brokerProducer) shutdown() { @@ -1395,7 +1364,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { + if bp.parent.conf.Producer.Idempotent { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1438,7 +1407,6 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { - pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1475,6 +1443,13 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio msg.retries++ } + // honor Producer.Retry.Backoff between retry attempts (#2469); the + // non-idempotent path gets this from partitionProducer.dispatch, but + // retryBatch dispatches the produceSet directly to the broker + if len(pSet.msgs) > 0 { + p.backoff(pSet.msgs[0].retries) + } + // it's expected that a metadata refresh has been requested prior to calling retryBatch leader, leaderErr := p.client.Leader(topic, partition) if leaderErr != nil { @@ -1521,7 +1496,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { retryTopicSeen[topic] = struct{}{} retryTopics = append(retryTopics, topic) }) - if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 { + if len(retryTopics) > 0 { refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) if refreshErr != nil { Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) @@ -1540,10 +1515,9 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { } keepMuted[topic][partition] = struct{}{} } - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { - pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index ed5e6beeb..9e47196a9 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1957,37 +1957,39 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } -func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { +func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 - txnMgr := &transactionManager{ - producerID: 0, - producerEpoch: 0, - sequenceNumbers: make(map[string]int32), - } parent := &asyncProducer{ conf: config, muter: newPartitionMuter(), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), - retries: make(chan *ProducerMessage, 1), - txnmgr: txnMgr, + txnmgr: &transactionManager{}, } - leader := &Broker{id: 1} - parent.client = &stubLeaderClient{leader: leader, cfg: config} + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + parent.client = &stubLeaderClient{leader: retryLeader, cfg: config} + + output := make(chan *produceSet) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP bp := &brokerProducer{ parent: parent, - broker: leader, + broker: failedBroker, input: make(chan *ProducerMessage), accumulatingBatch: newProduceSet(parent), currentRetries: make(map[string]map[int32]error), } - parent.brokers[leader] = bp - parent.brokerRefs[bp] = 1 sent := newProduceSet(parent) safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) @@ -1995,15 +1997,18 @@ func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { if !parent.muter.tryMute(sent) { t.Fatal("expected sent batch to mute partitions") } - defer parent.muter.unmute(sent) - response := new(ProduceResponse) - response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) - bp.handleSuccess(sent, response) + done := make(chan struct{}) + go func() { + bp.handleError(sent, ErrOutOfBrokers) + close(done) + }() + assertDoneWithin(t, done, 2*time.Second) - retried := assertDoneWithin(t, parent.retries, 2*time.Second) - require.Equal(t, retryPartitionSet.msgs[0], retried) - require.True(t, retried.reusePartitionMute) + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + require.Equal(t, 1, retryPartitionSet.msgs[0].retries) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2011,13 +2016,54 @@ func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { parent.muter.unmute(contender) t.Fatal("expected partition to remain muted by the retrying batch") } +} - safeAddMessage(t, bp.accumulatingBatch, retried) - if !bp.tryBuildFlushingBatch() { - t.Fatal("expected retry batch to reuse the existing partition mute") +func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + txnmgr: &transactionManager{}, } - require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) - require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) + parent.client = &failingLeaderClient{ + stubLeaderClient: stubLeaderClient{cfg: config}, + err: ErrOutOfBrokers, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + + bp.handleError(sent, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrOutOfBrokers, producerErr.Err) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after retry leader lookup failure") + } + parent.muter.unmute(contender) } type stubLeaderClient struct { @@ -2055,6 +2101,19 @@ func (c *stubLeaderClient) PartitionNotReadable(string, int32) bool { r func (c *stubLeaderClient) Close() error { return nil } func (c *stubLeaderClient) Closed() bool { return false } +type failingLeaderClient struct { + stubLeaderClient + err error +} + +func (c *failingLeaderClient) Leader(string, int32) (*Broker, error) { + return nil, c.err +} + +func (c *failingLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, error) { + return nil, 0, c.err +} + func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/produce_set.go b/produce_set.go index a86235b9e..b1ad8ad38 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,31 +12,6 @@ type partitionSet struct { bufferBytes int } -// markPartitionMuteReusable lets the first message carry the retry batch's -// right to reuse the partition mute that is still held by its failed send. -// Only the first message is marked because one held mute represents one -// in-flight retry permission; marking every message could let split retry -// fragments bypass each other under the same reservation. -func (ps *partitionSet) markPartitionMuteReusable() { - if len(ps.msgs) > 0 { - ps.msgs[0].reusePartitionMute = true - } -} - -// canReusePartitionMute reports whether this retry batch may be flushed while -// its partition is already muted by the previous send attempt. -func (ps *partitionSet) canReusePartitionMute() bool { - return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute -} - -// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry -// batch is selected for flushing. -func (ps *partitionSet) clearPartitionMuteReuse() { - for _, msg := range ps.msgs { - msg.reusePartitionMute = false - } -} - // canRetry prevents holding a partition mute when every message in the set will // be returned as an error instead of being retried. func (ps *partitionSet) canRetry(maxRetries int) bool { From 3b3b97b0acf4eeb5dce52645b638fb28c2734f7f Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 00:45:28 +0800 Subject: [PATCH 3/5] fix code Signed-off-by: 3AceShowHand --- async_producer.go | 8 +++++++- async_producer_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/async_producer.go b/async_producer.go index 226485fcc..e50b30264 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1407,6 +1407,9 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + // Non-idempotent response retries must go back through partitionProducer. + // That path advances the retry high watermark and refreshes the partition + // leader before sending the retry; retryBatch would bypass that state. bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1504,7 +1507,10 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { } keepMuted := make(map[string]map[int32]struct{}) sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - // keep partition marked as in-flight during retry (connection error) + // Connection failures have no ProduceResponse to feed back through + // partitionProducer. Keep the sent batch muted and retry it asynchronously + // so later same-partition batches cannot pass it, without blocking the + // brokerProducer response loop (#1203). if bp.currentRetries[topic] == nil { bp.currentRetries[topic] = make(map[int32]error) } diff --git a/async_producer_test.go b/async_producer_test.go index 9e47196a9..614686c2d 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2018,6 +2018,50 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + retries: make(chan *ProducerMessage, 1), + txnmgr: &transactionManager{}, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} + safeAddMessage(t, sent, msg) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, msg, retried) + require.Equal(t, 1, retried.retries) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected response retry to release sent mute before partitionProducer retry") + } + parent.muter.unmute(contender) +} + func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false From b46bb9751b30ee338aadf8372222e9ccfe1c0bde Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 01:27:21 +0800 Subject: [PATCH 4/5] still retry match Signed-off-by: 3AceShowHand --- async_producer.go | 158 +++++++++++++++++++++++------- async_producer_test.go | 216 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 15 +-- produce_set_test.go | 18 ++++ 4 files changed, 366 insertions(+), 41 deletions(-) diff --git a/async_producer.go b/async_producer.go index e50b30264..5a3845473 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/eapache/go-resiliency/breaker" @@ -92,6 +93,10 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup + // shutdownCh is closed before waiting on in-flight messages so retryBatch + // goroutines can stop waiting on broker handoff and release partition mutes. + shutdownCh chan struct{} + shutdownChClosed atomic.Bool brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int @@ -303,6 +308,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), + shutdownCh: make(chan struct{}), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), txnmgr: txnmgr, @@ -956,6 +962,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + done: make(chan struct{}), accumulatingBatch: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -1069,6 +1076,11 @@ type brokerProducer struct { output chan<- *produceSet responses <-chan *brokerProducerResponse abandoned chan struct{} + // done is closed before output is closed. Direct retry handoff waits on + // output while the batch still owns its partition mute; if the brokerProducer + // is shutting down, that send may never be received and the mute would block + // later same-partition messages and producer shutdown. + done chan struct{} accumulatingBatch *produceSet flushingBatch *produceSet // batch that has been muted and is ready to send @@ -1207,6 +1219,9 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { } func (bp *brokerProducer) shutdown() { + if bp.done != nil { + close(bp.done) + } // flush any ready buffer for bp.flushingBatch != nil { select { @@ -1435,12 +1450,21 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet.msgs[topic][partition] = pSet produceSet.bufferBytes += pSet.bufferBytes produceSet.bufferCount += len(pSet.msgs) + muted := alreadyMuted + failBatch := func(err error) { + // Release the partition before reporting errors so a blocked Errors + // consumer cannot keep later same-partition messages muted. + if muted { + p.muter.unmute(produceSet) + muted = false + } + for _, msg := range pSet.msgs { + p.returnError(msg, err) + } + } for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { - p.returnErrors(pSet.msgs, retryErr) - if alreadyMuted { - p.muter.unmute(produceSet) - } + failBatch(retryErr) return } msg.retries++ @@ -1457,25 +1481,88 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio leader, leaderErr := p.client.Leader(topic, partition) if leaderErr != nil { Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, leaderErr) - for _, msg := range pSet.msgs { - p.returnError(msg, retryErr) - } - if alreadyMuted { - p.muter.unmute(produceSet) - } + failBatch(retryErr) return } if !alreadyMuted { if !p.muter.waitUntilMuted(produceSet) { - for _, msg := range pSet.msgs { - p.returnError(msg, retryErr) - } + failBatch(retryErr) return } + muted = true } bp := p.getBrokerProducer(leader) - bp.output <- produceSet - p.unrefBrokerProducer(leader, bp) + defer p.unrefBrokerProducer(leader, bp) + if !p.sendRetryBatch(bp, produceSet) { + failBatch(ErrShuttingDown) + return + } +} + +type partitionBatchRetry struct { + topic string + partition int32 + pSet *partitionSet +} + +// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. +// Connection errors already hold partition mutes; doing the refresh in the +// response loop can block all progress for that broker while the cluster is +// unstable. Refresh once for the failed request so multi-partition batches do +// not amplify controller-fail metadata traffic, then retry partitions +// independently so one blocked broker handoff cannot hold up the rest. +func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { + if p.shutdownCh != nil { + select { + case <-p.shutdownCh: + for _, batch := range batches { + go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + } + return + default: + } + } + if len(topics) > 0 { + if err := p.client.RefreshMetadata(topics...); err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) + } + } + for _, batch := range batches { + go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + } +} + +// sendRetryBatch transfers ownership of a muted retry batch to the broker +// bridge. A false result means no owner accepted the batch, so the caller must +// release the mute and fail the messages instead of leaving a retry goroutine +// blocked while it still owns the partition mute. +func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool { + var done <-chan struct{} + if bp.done != nil { + done = bp.done + } + select { + case <-done: + return false + default: + } + select { + case bp.output <- set: + return true + default: + } + var shutdownCh <-chan struct{} + if p.shutdownCh != nil { + shutdownCh = p.shutdownCh + } + select { + case bp.output <- set: + return true + case <-done: + return false + case <-shutdownCh: + return false + } } func (bp *brokerProducer) handleError(sent *produceSet, err error) { @@ -1490,22 +1577,10 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err + keepMuted := make(map[string]map[int32]struct{}) var retryTopics []string retryTopicSeen := make(map[string]struct{}) - sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - if _, ok := retryTopicSeen[topic]; ok { - return - } - retryTopicSeen[topic] = struct{}{} - retryTopics = append(retryTopics, topic) - }) - if len(retryTopics) > 0 { - refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) - if refreshErr != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) - } - } - keepMuted := make(map[string]map[int32]struct{}) + var retryBatches []partitionBatchRetry sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { // Connection failures have no ProduceResponse to feed back through // partitionProducer. Keep the sent batch muted and retry it asynchronously @@ -1515,18 +1590,27 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { + if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) { + bp.parent.returnErrors(pSet.msgs, err) + } else { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} - } - if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { - go bp.parent.retryBatch(topic, partition, pSet, err, true) - } else { - bp.parent.retryMessages(pSet.msgs, err) + if _, ok := retryTopicSeen[topic]; !ok { + retryTopicSeen[topic] = struct{}{} + retryTopics = append(retryTopics, topic) + } + retryBatches = append(retryBatches, partitionBatchRetry{ + topic: topic, + partition: partition, + pSet: pSet, + }) } }) + if len(retryBatches) > 0 { + go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err) + } bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) }) @@ -1608,6 +1692,10 @@ func (p *asyncProducer) retryHandler() { // utility functions func (p *asyncProducer) shutdown() { + Logger.Println("Producer shutting down.") + if p.shutdownCh != nil && p.shutdownChClosed.CompareAndSwap(false, true) { + close(p.shutdownCh) + } p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} diff --git a/async_producer_test.go b/async_producer_test.go index 614686c2d..4bb9cd87a 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2018,6 +2018,141 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } +func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + client := &blockingRefreshClient{ + stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, + started: make(chan struct{}, 1), + release: make(chan struct{}), + } + parent.client = client + + output := make(chan *produceSet) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + done := make(chan struct{}) + go func() { + bp.handleError(sent, ErrOutOfBrokers) + close(done) + }() + assertDoneWithin(t, done, 2*time.Second) + assertDoneWithin(t, client.started, 2*time.Second) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted while async metadata refresh is pending") + } + + close(client.release) + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) +} + +func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + client := &countingRefreshClient{ + stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, + calls: make(chan []string, 1), + } + parent.client = client + + output := make(chan *produceSet, 2) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP + parent.brokerRefs[retryBP] = 1 + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) + retryPartitionSet0 := sent.msgs["topic"][0] + retryPartitionSet1 := sent.msgs["topic"][1] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + bp.handleError(sent, ErrOutOfBrokers) + + require.Equal(t, []string{"topic"}, assertDoneWithin(t, client.calls, 2*time.Second)) + retrySet0 := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet0) + retrySet1 := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet1) + assertNotDone(t, client.calls, 50*time.Millisecond) + + retriedPartitions := make(map[int32]*partitionSet) + retrySet0.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retriedPartitions[partition] = pSet + }) + retrySet1.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retriedPartitions[partition] = pSet + }) + require.Same(t, retryPartitionSet0, retriedPartitions[int32(0)]) + require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) +} + func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2062,6 +2197,62 @@ func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { parent.muter.unmute(contender) } +func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + if parent.shutdownChClosed.CompareAndSwap(false, true) { + close(parent.shutdownCh) + } + + done := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + close(done) + }() + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, producerErr.Err) + assertDoneWithin(t, done, 2*time.Second) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after aborted retry handoff") + } + parent.muter.unmute(contender) +} + func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2158,6 +2349,31 @@ func (c *failingLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, err return nil, 0, c.err } +type blockingRefreshClient struct { + stubLeaderClient + started chan struct{} + release chan struct{} +} + +func (c *blockingRefreshClient) RefreshMetadata(...string) error { + select { + case c.started <- struct{}{}: + default: + } + <-c.release + return nil +} + +type countingRefreshClient struct { + stubLeaderClient + calls chan []string +} + +func (c *countingRefreshClient) RefreshMetadata(topics ...string) error { + c.calls <- append([]string(nil), topics...) + return nil +} + func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/produce_set.go b/produce_set.go index b1ad8ad38..121a272ac 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,15 +12,18 @@ type partitionSet struct { bufferBytes int } -// canRetry prevents holding a partition mute when every message in the set will -// be returned as an error instead of being retried. -func (ps *partitionSet) canRetry(maxRetries int) bool { +// shouldKeepMuted matches retryBatch's whole-batch retry rule: if any message has +// exhausted retries, the batch will be failed instead of retried. +func (ps *partitionSet) shouldKeepMuted(maxRetries int) bool { + if len(ps.msgs) == 0 { + return false + } for _, msg := range ps.msgs { - if msg.retries < maxRetries { - return true + if msg.retries >= maxRetries { + return false } } - return false + return true } type produceSet struct { diff --git a/produce_set_test.go b/produce_set_test.go index 9ab964949..359bfd47b 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -25,6 +25,24 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } } +func TestShouldKeepMuted(t *testing.T) { + if (&partitionSet{}).shouldKeepMuted(1) { + t.Fatal("empty partition set should not be retryable") + } + if !(&partitionSet{msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 0}, + }}).shouldKeepMuted(1) { + t.Fatal("expected batch to be retryable when every message is below retry max") + } + if (&partitionSet{msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 1}, + }}).shouldKeepMuted(1) { + t.Fatal("expected batch not to be retryable when any message has exhausted retries") + } +} + func TestProduceSetInitial(t *testing.T) { _, ps := makeProduceSet() From 644038bc7e1ce52503520e8f86ff2594dd1ed5ec Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 18 Jun 2026 01:40:29 +0800 Subject: [PATCH 5/5] add backoff method Signed-off-by: 3AceShowHand --- async_producer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5a3845473..4c82ae572 100644 --- a/async_producer.go +++ b/async_producer.go @@ -787,12 +787,16 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan } func (pp *partitionProducer) backoff(retries int) { + pp.parent.backoff(retries) +} + +func (p *asyncProducer) backoff(retries int) { var backoff time.Duration - if pp.parent.conf.Producer.Retry.BackoffFunc != nil { - maxRetries := pp.parent.conf.Producer.Retry.Max - backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries) + if p.conf.Producer.Retry.BackoffFunc != nil { + maxRetries := p.conf.Producer.Retry.Max + backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries) } else { - backoff = pp.parent.conf.Producer.Retry.Backoff + backoff = p.conf.Producer.Retry.Backoff } if backoff > 0 { time.Sleep(backoff)