From f1fbc20c49f7ca843d41eec499d779a56788ea2b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 25 Jun 2026 21:57:08 +0800 Subject: [PATCH 1/5] minimim change --- async_producer.go | 121 +++++++++++++++++++--------- async_producer_test.go | 174 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 255 insertions(+), 40 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..3804678b1 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/eapache/go-resiliency/breaker" @@ -93,6 +94,9 @@ type asyncProducer struct { input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup + done chan struct{} + closed atomic.Bool + brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int brokerLock sync.Mutex @@ -303,6 +307,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), + done: make(chan struct{}), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), txnmgr: txnmgr, @@ -781,12 +786,16 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan } func (pp *partitionProducer) backoff(retries int) { + pp.parent.retryBackoff(retries) +} + +func (p *asyncProducer) retryBackoff(retries int) { var backoff time.Duration - if pp.parent.conf.Producer.Retry.BackoffFunc != nil { - maxRetries := pp.parent.conf.Producer.Retry.Max - backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries) + if p.conf.Producer.Retry.BackoffFunc != nil { + maxRetries := p.conf.Producer.Retry.Max + backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries) } else { - backoff = pp.parent.conf.Producer.Retry.Backoff + backoff = p.conf.Producer.Retry.Backoff } if backoff > 0 { time.Sleep(backoff) @@ -821,12 +830,19 @@ func (pp *partitionProducer) dispatch() { }() for msg := range pp.input { - if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil { + if pp.brokerProducer != nil { select { case <-pp.brokerProducer.abandoned: // a message on the abandoned channel means that our current broker selection is out of date - pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) + abandonedLeader := pp.leader + pp.parent.unrefBrokerProducer(abandonedLeader, pp.brokerProducer) pp.brokerProducer = nil + if leader, _ := pp.parent.client.Leader(pp.topic, pp.partition); leader != nil && leader.ID() != abandonedLeader.ID() { + pp.leader = leader + pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) + pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight + pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} + } time.Sleep(pp.parent.conf.Producer.Retry.Backoff) default: // producer connection is still open. @@ -956,6 +972,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + abandoned: make(chan struct{}), accumulatingBatch: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -1046,10 +1063,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { } }) - if p.conf.Producer.Retry.Max <= 0 { - bp.abandoned = make(chan struct{}) - } - return bp } @@ -1092,10 +1105,16 @@ func (bp *brokerProducer) run() { timerChan = bp.timer.C } + var unmuteCh <-chan struct{} if bp.flushingBatch != nil { output = bp.output } else { output = nil + if bp.accumulatingBatch.readyToFlush() { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } } select { @@ -1169,6 +1188,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1354,12 +1374,10 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { - if keepMuted[topic] == nil { - keepMuted[topic] = make(map[int32]struct{}) - } - keepMuted[topic][partition] = struct{}{} + if keepMuted[topic] == nil { + keepMuted[topic] = make(map[int32]struct{}) } + keepMuted[topic][partition] = struct{}{} } // Other non-retriable errors default: @@ -1371,11 +1389,9 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo }) if len(retryTopics) > 0 { - if bp.parent.conf.Producer.Idempotent { - err := bp.parent.client.RefreshMetadata(retryTopics...) - if err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) - } + err := bp.parent.client.RefreshMetadata(retryTopics...) + if err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) } sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { @@ -1390,17 +1406,25 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) - if bp.currentRetries[topic] == nil { - bp.currentRetries[topic] = make(map[int32]error) + leaderChanged := false + if !bp.parent.conf.Producer.Idempotent { + leader, leaderErr := bp.parent.client.Leader(topic, partition) + leaderChanged = leaderErr == nil && leader != nil && leader.ID() != bp.broker.ID() } - bp.currentRetries[topic][partition] = block.Err + if bp.parent.conf.Producer.Idempotent || leaderChanged { + if bp.currentRetries[topic] == nil { + bp.currentRetries[topic] = make(map[int32]error) + } + bp.currentRetries[topic][partition] = block.Err + } + if leaderChanged { + bp.parent.abandonBrokerConnection(bp.broker) + } + go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) if bp.parent.conf.Producer.Idempotent { - go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) - } else { - bp.parent.retryMessages(pSet.msgs, block.Err) + // dropping the following messages has the side effect of incrementing their retry count + bp.parent.retryMessages(bp.accumulatingBatch.dropPartition(topic, partition), block.Err) } - // dropping the following messages has the side effect of incrementing their retry count - bp.parent.retryMessages(bp.accumulatingBatch.dropPartition(topic, partition), block.Err) } }) } @@ -1422,6 +1446,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet.msgs[topic][partition] = pSet produceSet.bufferBytes += pSet.bufferBytes produceSet.bufferCount += len(pSet.msgs) + retryAttempt := 0 for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { p.returnErrors(pSet.msgs, retryErr) @@ -1431,6 +1456,10 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio return } msg.retries++ + retryAttempt = msg.retries + } + if !p.conf.Producer.Idempotent { + p.retryBackoff(retryAttempt) } // it's expected that a metadata refresh has been requested prior to calling retryBatch @@ -1454,8 +1483,23 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } } bp := p.getBrokerProducer(leader) - bp.output <- produceSet - p.unrefBrokerProducer(leader, bp) + defer p.unrefBrokerProducer(leader, bp) + + select { + case bp.output <- produceSet: + return + default: + } + + select { + case bp.output <- produceSet: + return + case <-p.done: + for _, msg := range pSet.msgs { + p.returnError(msg, ErrShuttingDown) + } + p.muter.unmute(produceSet) + } } func (bp *brokerProducer) handleError(sent *produceSet, err error) { @@ -1479,7 +1523,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { retryTopicSeen[topic] = struct{}{} retryTopics = append(retryTopics, topic) }) - if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 { + if len(retryTopics) > 0 { refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) if refreshErr != nil { Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) @@ -1492,15 +1536,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { - if keepMuted[topic] == nil { - keepMuted[topic] = make(map[int32]struct{}) - } - keepMuted[topic][partition] = struct{}{} - go bp.parent.retryBatch(topic, partition, pSet, err, true) - } else { - bp.parent.retryMessages(pSet.msgs, err) + if keepMuted[topic] == nil { + keepMuted[topic] = make(map[int32]struct{}) } + keepMuted[topic][partition] = struct{}{} + go bp.parent.retryBatch(topic, partition, pSet, err, true) }) bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) @@ -1583,6 +1623,9 @@ func (p *asyncProducer) retryHandler() { // utility functions func (p *asyncProducer) shutdown() { + if p.closed.CompareAndSwap(false, true) { + close(p.done) + } p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..6dd9a2f4b 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1910,6 +1910,177 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestBrokerProducerHandleErrorKeepsNonIdempotentRetryMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + } + retryLeader := &Broker{id: 2} + parent.client = &stubLeaderClient{leader: retryLeader, cfg: config} + + output := make(chan *produceSet, 1) + parent.brokers[retryLeader] = &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} + safeAddMessage(t, sent, msg) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + bp.handleError(sent, ErrOutOfBrokers) + + assertNotDone(t, parent.retries, 50*time.Millisecond) + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected connection retry to keep sent partition muted") + } + + retrySet := assertDoneWithin(t, output, 2*time.Second) + parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + require.Equal(t, 1, msg.retries) +} + +func TestHandleSuccessNonIdempotentRetryKeepsMute(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + } + retryLeader := &Broker{id: 2} + parent.client = &stubLeaderClient{leader: retryLeader, cfg: config} + + output := make(chan *produceSet) + parent.brokers[retryLeader] = &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} + safeAddMessage(t, sent, msg) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + assertNotDone(t, parent.retries, 50*time.Millisecond) + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected response retry to keep sent partition muted") + } + + retrySet := assertDoneWithin(t, output, 2*time.Second) + parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + require.Equal(t, 1, msg.retries) +} + +func TestRetryBatchReleasesMuteOnShutdown(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + parent.brokers[leader] = &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + + done := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + close(done) + }() + + close(parent.done) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, producerErr.Err) + assertDoneWithin(t, done, 2*time.Second) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after aborted retry handoff") + } + parent.muter.unmute(contender) +} + type stubLeaderClient struct { cfg *Config leader *Broker @@ -2662,7 +2833,8 @@ func TestProducerRetryBufferLimits(t *testing.T) { wg.Wait() assert.Equal(t, successes+producerErrors, tt.numMessages, "Expected all messages to be processed") - assert.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow") + assert.Equal(t, 0, successes, "Expected retriable responses to eventually fail without successful writes") + assert.False(t, errorFound, "Ordered sent-batch retries should be backpressured by partition mute, not retry-buffer overflow") }) } } From 693a21d3caa554836a985b4747d4c1131e92ef35 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 25 Jun 2026 22:19:10 +0800 Subject: [PATCH 2/5] remove more logs --- async_producer.go | 1 - broker.go | 4 ++-- client.go | 4 ---- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/async_producer.go b/async_producer.go index 3804678b1..00526499c 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1157,7 +1157,6 @@ func (bp *brokerProducer) run() { } if bp.accumulatingBatch.wouldOverflow(msg) { - DebugLogger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) if err := bp.waitForSpace(msg, false); err != nil { bp.parent.retryMessage(msg, err) continue diff --git a/broker.go b/broker.go index ad74ae7da..e56d66dc5 100644 --- a/broker.go +++ b/broker.go @@ -253,7 +253,7 @@ func (b *Broker) Open(conf *Config) error { return } - Logger.Printf("Error while sending ApiVersionsRequest V3 to broker %s: %s\n", b.addr, err) + DebugLogger.Printf("Error while sending ApiVersionsRequest V3 to broker %s: %s\n", b.addr, err) // send a lower version request in case remote cluster is <= 2.4.0.0 maxVersion := int16(0) if apiVersionsResponse != nil { @@ -269,7 +269,7 @@ func (b *Broker) Open(conf *Config) error { if b.maybeCloseLocked(err) { return } - Logger.Printf("Error while sending ApiVersionsRequest V%d to broker %s: %s\n", maxVersion, b.addr, err) + DebugLogger.Printf("Error while sending ApiVersionsRequest V%d to broker %s: %s\n", maxVersion, b.addr, err) } } if apiVersionsResponse != nil { diff --git a/client.go b/client.go index 6641743ac..bb02f8314 100644 --- a/client.go +++ b/client.go @@ -732,7 +732,6 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { for _, broker := range brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) } } @@ -741,7 +740,6 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { func (client *client) checkBrokersHealth() { for id, broker := range client.brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) delete(client.brokers, id) } @@ -1103,12 +1101,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, return err } // else remove that broker and try again - Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) _ = broker.Close() client.deregisterBroker(broker) } else { // some other error, remove that broker and try again - Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) brokerErrors = append(brokerErrors, err) _ = broker.Close() client.deregisterBroker(broker) From e4f938943270d5aa0f0e823c634c858c02f36d0a Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 25 Jun 2026 22:35:41 +0800 Subject: [PATCH 3/5] Guard broker abandon by producer identity --- async_producer.go | 19 ++++++++++--------- async_producer_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/async_producer.go b/async_producer.go index 00526499c..99d581cf0 100644 --- a/async_producer.go +++ b/async_producer.go @@ -1369,7 +1369,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: if bp.parent.conf.Producer.Retry.Max <= 0 { - bp.parent.abandonBrokerConnection(bp.broker) + bp.parent.abandonBrokerConnection(bp) bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) @@ -1381,7 +1381,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo // Other non-retriable errors default: if bp.parent.conf.Producer.Retry.Max <= 0 { - bp.parent.abandonBrokerConnection(bp.broker) + bp.parent.abandonBrokerConnection(bp) } bp.parent.returnErrors(pSet.msgs, block.Err) } @@ -1417,7 +1417,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.currentRetries[topic][partition] = block.Err } if leaderChanged { - bp.parent.abandonBrokerConnection(bp.broker) + bp.parent.abandonBrokerConnection(bp) } go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) if bp.parent.conf.Producer.Idempotent { @@ -1510,7 +1510,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.muter.unmute(sent) } else { Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) - bp.parent.abandonBrokerConnection(bp.broker) + bp.parent.abandonBrokerConnection(bp) _ = bp.broker.Close() bp.closing = err var retryTopics []string @@ -1758,14 +1758,15 @@ func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) } } -func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { +func (p *asyncProducer) abandonBrokerConnection(bp *brokerProducer) { p.brokerLock.Lock() defer p.brokerLock.Unlock() - bc, ok := p.brokers[broker] - if ok && bc.abandoned != nil { - close(bc.abandoned) + bc, ok := p.brokers[bp.broker] + if !ok || bc != bp { + return } - delete(p.brokers, broker) + close(bc.abandoned) + delete(p.brokers, bp.broker) } diff --git a/async_producer_test.go b/async_producer_test.go index 6dd9a2f4b..d30339083 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2081,6 +2081,37 @@ func TestRetryBatchReleasesMuteOnShutdown(t *testing.T) { parent.muter.unmute(contender) } +func TestAbandonBrokerConnectionIgnoresReplacedBrokerProducer(t *testing.T) { + parent := &asyncProducer{ + brokers: make(map[*Broker]*brokerProducer), + } + broker := &Broker{id: 1} + oldBP := &brokerProducer{ + broker: broker, + abandoned: make(chan struct{}), + } + newBP := &brokerProducer{ + broker: broker, + abandoned: make(chan struct{}), + } + parent.brokers[broker] = newBP + + parent.abandonBrokerConnection(oldBP) + + assertNotDone(t, oldBP.abandoned, 50*time.Millisecond) + assertNotDone(t, newBP.abandoned, 50*time.Millisecond) + if parent.brokers[broker] != newBP { + t.Fatal("old brokerProducer abandoned the replacement") + } + + parent.abandonBrokerConnection(newBP) + + assertDoneWithin(t, newBP.abandoned, 2*time.Second) + if _, ok := parent.brokers[broker]; ok { + t.Fatal("current brokerProducer was not removed") + } +} + type stubLeaderClient struct { cfg *Config leader *Broker From d444254a7728c1394e48f5ac0da51b5881e0008c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 25 Jun 2026 23:03:32 +0800 Subject: [PATCH 4/5] fix test --- async_producer_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index d30339083..4e9f6fbd1 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2764,7 +2764,7 @@ func TestTxnCanAbort(t *testing.T) { require.NoError(t, err) } -func TestProducerRetryBufferLimits(t *testing.T) { +func TestProducerRetriableResponsesBypassRetryBufferLimits(t *testing.T) { broker := NewMockBroker(t, 1) defer broker.Close() topic := "test-topic" @@ -2800,7 +2800,7 @@ func TestProducerRetryBufferLimits(t *testing.T) { config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength }, messageSize: 1, // Small message size - numMessages: 10000, + numMessages: 2, }, { name: "MaxBufferBytes", @@ -2809,7 +2809,7 @@ func TestProducerRetryBufferLimits(t *testing.T) { config.Producer.Retry.MaxBufferBytes = minFunctionalRetryBufferBytes }, messageSize: 950 * 1024, // 950 KB - numMessages: 1000, + numMessages: 2, }, } @@ -2817,6 +2817,7 @@ func TestProducerRetryBufferLimits(t *testing.T) { t.Run(tt.name, func(t *testing.T) { config := NewTestConfig() config.Producer.Return.Successes = true + config.Producer.Retry.Max = 1 tt.configureBuffer(config) producer, err := NewAsyncProducer([]string{broker.Addr()}, config) From 8a747a8e34a3b1d7486ed02f0921278d17128f82 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 25 Jun 2026 23:37:45 +0800 Subject: [PATCH 5/5] try to fix broken test --- functional_producer_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/functional_producer_test.go b/functional_producer_test.go index df8a5d0ca..9f3059ca4 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -1039,10 +1039,9 @@ func validateProducerMetrics(t *testing.T, client Client) { } metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000)) } else { - // We record compression ratios of 1.00 (100 with a histogram) for every TestBatchSize record + // We record compression ratios of 1.00 (100 with a histogram) for each record batch. if client.Config().Version.IsAtLeast(V0_11_0_0) { - // records will be grouped in batchSet rather than msgSet - metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 3)) + metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1)) } else { metricValidators.registerForGlobalAndTopic("test_1", countHistogramValidator("compression-ratio", TestBatchSize)) }