From 5b2605f673b0a1ab1c2a519c969aa062a26abe47 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 17 Jun 2026 20:55:49 +0800 Subject: [PATCH 1/3] idempotent is false still keep order by the mute --- async_producer.go | 67 ++++++++++++++++++++----- async_producer_test.go | 110 +++++++++++++++++++++++++++++++++++++++++ produce_set.go | 36 ++++++++++++++ 3 files changed, 202 insertions(+), 11 deletions(-) diff --git a/async_producer.go b/async_producer.go index a9af20b1b..8eb319ed3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } +func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.isMuted(topic, partition) +} + // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -370,12 +377,17 @@ type ProducerMessage struct { // successfully delivered and RequiredAcks is not NoResponse. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError - sequenceNumber int32 - producerEpoch int16 - hasSequence bool + retries int + flags flagSet + // reusePartitionMute is a one-shot marker for non-idempotent retries. The + // first retried message from a failed, still-muted batch may reuse that + // existing in-flight reservation so later same-partition messages cannot + // slip in between the failed send and its retry. + reusePartitionMute bool + expectation chan *ProducerError + sequenceNumber int32 + producerEpoch int16 + hasSequence bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -1083,7 +1096,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { + readyToFlush := bp.timerFired || bp.accumulatingBatch.readyToFlush() + if bp.flushingBatch == nil && readyToFlush { bp.tryBuildFlushingBatch() } @@ -1098,6 +1112,13 @@ func (bp *brokerProducer) run() { output = nil } + var unmuteCh <-chan struct{} + if bp.flushingBatch == nil && readyToFlush { + if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + unmuteCh = ch + } + } + select { case msg, ok := <-bp.input: if !ok { @@ -1169,6 +1190,7 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } + case <-unmuteCh: } } } @@ -1186,10 +1208,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { return bp.parent.muter.tryMutePartition(topic, partition) }) - if partial == nil { + if partial != nil { + bp.flushingBatch = partial + if bp.accumulatingBatch.empty() { + bp.rollOver() + } + return true + } + + reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { + partitions := bp.accumulatingBatch.msgs[topic] + if partitions == nil { + return false + } + set := partitions[partition] + return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) + }) + if reused == nil { return false } - bp.flushingBatch = partial + reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + pSet.clearPartitionMuteReuse() + }) + bp.flushingBatch = reused if bp.accumulatingBatch.empty() { bp.rollOver() } @@ -1354,7 +1395,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1397,6 +1438,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1492,13 +1534,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent { + if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} + } + if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, err, true) } else { + pSet.markPartitionMuteReusable() bp.parent.retryMessages(pSet.msgs, err) } }) diff --git a/async_producer_test.go b/async_producer_test.go index c32ac3ef5..ed5e6beeb 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } +func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { + config := NewTestConfig() + config.Producer.Flush.Messages = 1 + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + txnmgr: &transactionManager{}, + } + + blockedSet := newProduceSet(parent) + safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) + if !parent.muter.tryMute(blockedSet) { + t.Fatal("expected to mute partition externally") + } + + input := make(chan *ProducerMessage) + output := make(chan *produceSet, 1) + responses := make(chan *brokerProducerResponse) + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: input, + output: output, + responses: responses, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} + safeAddMessage(t, bp.accumulatingBatch, retryMsg) + + done := make(chan struct{}) + go func() { + bp.run() + close(done) + }() + + assertNotDone(t, output, 50*time.Millisecond) + parent.muter.unmute(blockedSet) + + flushed := assertDoneWithin(t, output, 2*time.Second) + require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) + + close(input) + close(responses) + assertDoneWithin(t, done, 2*time.Second) +} + // TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } +func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + txnMgr := &transactionManager{ + producerID: 0, + producerEpoch: 0, + sequenceNumbers: make(map[string]int32), + } + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + retries: make(chan *ProducerMessage, 1), + txnmgr: txnMgr, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + + bp := &brokerProducer{ + parent: parent, + broker: leader, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + parent.brokers[leader] = bp + parent.brokerRefs[bp] = 1 + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + defer parent.muter.unmute(sent) + + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) + + retried := assertDoneWithin(t, parent.retries, 2*time.Second) + require.Equal(t, retryPartitionSet.msgs[0], retried) + require.True(t, retried.reusePartitionMute) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } + + safeAddMessage(t, bp.accumulatingBatch, retried) + if !bp.tryBuildFlushingBatch() { + t.Fatal("expected retry batch to reuse the existing partition mute") + } + require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) + require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) +} + type stubLeaderClient struct { cfg *Config leader *Broker diff --git a/produce_set.go b/produce_set.go index 380f7f5ab..a86235b9e 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,6 +12,42 @@ type partitionSet struct { bufferBytes int } +// markPartitionMuteReusable lets the first message carry the retry batch's +// right to reuse the partition mute that is still held by its failed send. +// Only the first message is marked because one held mute represents one +// in-flight retry permission; marking every message could let split retry +// fragments bypass each other under the same reservation. +func (ps *partitionSet) markPartitionMuteReusable() { + if len(ps.msgs) > 0 { + ps.msgs[0].reusePartitionMute = true + } +} + +// canReusePartitionMute reports whether this retry batch may be flushed while +// its partition is already muted by the previous send attempt. +func (ps *partitionSet) canReusePartitionMute() bool { + return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute +} + +// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry +// batch is selected for flushing. +func (ps *partitionSet) clearPartitionMuteReuse() { + for _, msg := range ps.msgs { + msg.reusePartitionMute = false + } +} + +// canRetry prevents holding a partition mute when every message in the set will +// be returned as an error instead of being retried. +func (ps *partitionSet) canRetry(maxRetries int) bool { + for _, msg := range ps.msgs { + if msg.retries < maxRetries { + return true + } + } + return false +} + type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet From efe69a113018b9eb61132ca2f628b3f5914ffd56 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 22 Jun 2026 22:53:35 +0800 Subject: [PATCH 2/3] fix more review comments, and simplify the code Signed-off-by: 3AceShowHand --- async_producer.go | 188 +++++++++++++++++------------------------ async_producer_test.go | 113 +++++++++++++++++++------ 2 files changed, 164 insertions(+), 137 deletions(-) diff --git a/async_producer.go b/async_producer.go index c9d5f7d44..bcd7382e7 100644 --- a/async_producer.go +++ b/async_producer.go @@ -817,10 +817,8 @@ func (p *asyncProducer) backoffOrDone(retries int) bool { if backoff <= 0 { return !p.shuttingDown() } - timer := time.NewTimer(backoff) - defer timer.Stop() select { - case <-timer.C: + case <-time.After(backoff): return true case <-p.done: return false @@ -1244,7 +1242,7 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { } func (bp *brokerProducer) shutdown() { - if bp.closed.Swap(true) { + if !bp.closed.CompareAndSwap(false, true) { return } if bp.done != nil { @@ -1476,10 +1474,6 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet := newProduceSet(p) produceSet.addPartitionSet(topic, partition, pSet) bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) - var reservedMessages, reservedBytes int64 - defer func() { - p.retryBufferQuota.release(reservedMessages, reservedBytes) - }() muted := alreadyMuted failBatch := func(err error) { @@ -1502,7 +1496,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio failBatch(ErrProducerRetryBufferOverflow) return } - reservedMessages, reservedBytes = bufferMessages, bufferBytes + defer p.retryBufferQuota.release(bufferMessages, bufferBytes) // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. @@ -1532,23 +1526,11 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } } -type partitionBatchRetry struct { - topic string - partition int32 - pSet *partitionSet -} - -func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) { - produceSet := newProduceSet(p) - produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet) - p.muter.unmute(produceSet) - p.returnErrors(batch.pSet.msgs, err) -} - -func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) { - for _, batch := range batches { - p.failMutedBatch(batch, err) - } +func (p *asyncProducer) failMutedSet(set *produceSet, err error) { + p.muter.unmute(set) + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + p.returnErrors(pSet.msgs, err) + }) } func newRetryBufferQuota(conf *Config) messageQuota { @@ -1577,99 +1559,88 @@ func producerMessageByteSizeVersion(conf *Config) int { return 1 } -// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. -// Connection errors already hold partition mutes; doing the refresh in the -// response loop can block all progress for that broker while the cluster is -// unstable. Refresh once for the failed request so multi-partition batches do -// not amplify controller-fail metadata traffic. Group retry partitions by their -// refreshed leader broker, but hand them off from this retry worker so large -// clusters do not create one temporary goroutine per target broker. -// -// This is intentionally not implemented as a loop over retryBatch. retryBatch -// is the single-partition direct retry primitive; this path owns all muted -// partitions from one failed ProduceRequest and must retry, fail, reserve -// buffer, back off, and refresh metadata at that failed-request boundary. -func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { +// retryBatchesAfterRefresh retries muted batches from one failed ProduceRequest +// after a single metadata refresh, preserving that failed-request boundary. +func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr error) { if p.shuttingDown() { - p.failMutedBatches(batches, ErrShuttingDown) + p.failMutedSet(batches, ErrShuttingDown) return } - var retryable []partitionBatchRetry - maxRetryAttempt := 0 - for _, batch := range batches { - if !batch.pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { - p.failMutedBatch(batch, retryErr) - continue + var ( + maxRetryAttempt int + bufferMessages, bufferBytes int64 + ) + retryable := newProduceSet(p) + batches.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + if !pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { + failed := newProduceSet(p) + failed.addPartitionSet(topic, partition, pSet) + p.failMutedSet(failed, retryErr) + return } - for _, msg := range batch.pSet.msgs { + for _, msg := range pSet.msgs { msg.retries++ if msg.retries > maxRetryAttempt { maxRetryAttempt = msg.retries } } - retryable = append(retryable, batch) - } - if len(retryable) == 0 { + retryable.addPartitionSet(topic, partition, pSet) + bufferMessages += int64(len(pSet.msgs)) + bufferBytes += int64(pSet.bufferBytes) + }) + if retryable.empty() { return } - // Reserve before metadata refresh so refresh-blocked direct retries still - // count against the producer-level retry buffer budget. - var bufferMessages, bufferBytes int64 - for _, batch := range retryable { - bufferMessages += int64(len(batch.pSet.msgs)) - bufferBytes += int64(batch.pSet.bufferBytes) - } + // Reserve before waiting so pending direct retries still count against the + // producer-level retry buffer budget. if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { - p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) + p.failMutedSet(retryable, ErrProducerRetryBufferOverflow) return } - if p.shuttingDown() { + if !p.backoffOrDone(maxRetryAttempt) { p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) + p.failMutedSet(retryable, ErrShuttingDown) return } - if len(topics) > 0 { - if err := p.client.RefreshMetadata(topics...); err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) - } + topics := make([]string, 0, len(retryable.msgs)) + for topic := range retryable.msgs { + topics = append(topics, topic) } - if p.shuttingDown() { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) - return - } - - if !p.backoffOrDone(maxRetryAttempt) { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) - return + if err := p.client.RefreshMetadata(topics...); err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) } brokerSets := make(map[*Broker]*produceSet) - for _, batch := range retryable { - if p.shuttingDown() { - batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) - p.retryBufferQuota.release(batchMessages, batchBytes) - p.failMutedBatch(batch, ErrShuttingDown) - continue - } - leader, err := p.client.Leader(batch.topic, batch.partition) + retryable.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + leader, err := p.client.Leader(topic, partition) if err != nil { - Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, err) - batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err) + batchMessages, batchBytes := int64(len(pSet.msgs)), int64(pSet.bufferBytes) p.retryBufferQuota.release(batchMessages, batchBytes) - p.failMutedBatch(batch, retryErr) - continue + failed := newProduceSet(p) + failed.addPartitionSet(topic, partition, pSet) + p.failMutedSet(failed, retryErr) + return } set := brokerSets[leader] if set == nil { set = newProduceSet(p) brokerSets[leader] = set } - set.addPartitionSet(batch.topic, batch.partition, batch.pSet) + set.addPartitionSet(topic, partition, pSet) + }) + + // handoffRetryBatch also checks shutdown, but doing it here avoids creating + // or ref-counting brokerProducers just to reject the retry. + if p.shuttingDown() { + for _, set := range brokerSets { + p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) + p.failMutedSet(set, ErrShuttingDown) + } + return } for leader, set := range brokerSets { @@ -1678,10 +1649,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part p.unrefBrokerProducer(leader, bp) p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) if !accepted { - p.muter.unmute(set) - set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - p.returnErrors(pSet.msgs, ErrShuttingDown) - }) + p.failMutedSet(set, ErrShuttingDown) } } } @@ -1689,10 +1657,16 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part // handoffRetryBatch transfers ownership of a muted retry batch to the broker // bridge. On false, the caller still owns the mute and must fail the batch. func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool { - if bp.closed.Load() { + // If shutdown is already visible, do not let select randomly choose a ready + // send on bp.output. The second select still handles shutdown racing with + // the handoff. + select { + case <-bp.done: + return false + case <-p.done: return false + default: } - select { case bp.output <- set: return true @@ -1715,10 +1689,9 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err + + var retrySet *produceSet keepMuted := make(map[string]map[int32]struct{}) - var retryTopics []string - retryTopicSeen := make(map[string]struct{}) - var retryBatches []partitionBatchRetry sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { // Connection failures have no ProduceResponse to feed back through // partitionProducer. Keep the sent batch muted and retry it asynchronously @@ -1731,23 +1704,18 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) { bp.parent.returnErrors(pSet.msgs, err) } else { + if retrySet == nil { + retrySet = newProduceSet(bp.parent) + } if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} - if _, ok := retryTopicSeen[topic]; !ok { - retryTopicSeen[topic] = struct{}{} - retryTopics = append(retryTopics, topic) - } - retryBatches = append(retryBatches, partitionBatchRetry{ - topic: topic, - partition: partition, - pSet: pSet, - }) + retrySet.addPartitionSet(topic, partition, pSet) } }) - if len(retryBatches) > 0 { - go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err) + if retrySet != nil { + go bp.parent.retryBatchesAfterRefresh(retrySet, err) } bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) @@ -1991,7 +1959,7 @@ func (q *messageQuota) tryReserve(messages, bytes int64) bool { nextMessages := q.messages + messages nextBytes := q.bytes + bytes - if q.wouldOverflow(nextMessages, nextBytes) { + if q.overflows(nextMessages, nextBytes) { return false } q.messages = nextMessages @@ -2009,7 +1977,7 @@ func (q *messageQuota) addAndCheckOverflow(messages, bytes int64) bool { q.messages += messages q.bytes += bytes - return q.wouldOverflow(q.messages, q.bytes) + return q.overflows(q.messages, q.bytes) } func (q *messageQuota) release(messages, bytes int64) { @@ -2028,7 +1996,7 @@ func (q *messageQuota) shouldTrack(messages, bytes int64) bool { return (messages != 0 || bytes != 0) && (q.maxMessages > 0 || q.maxBytes > 0) } -func (q *messageQuota) wouldOverflow(messages, bytes int64) bool { +func (q *messageQuota) overflows(messages, bytes int64) bool { return (q.maxMessages > 0 && messages >= q.maxMessages) || (q.maxBytes > 0 && bytes >= q.maxBytes) } diff --git a/async_producer_test.go b/async_producer_test.go index b16e0bf61..bbc68976d 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1571,7 +1571,7 @@ func TestBrokerProducerShutdown(t *testing.T) { // TestBrokerProducerWaitForSpaceEmptyBufferRollover ensures forced rollovers with an empty buffer // do not deadlock waiting for responses when no partitions are muted. -func TestBrokerProducerWaitForSpaceEmptyBufferRollover(t *testing.T) { +func TestBrokerProducerWaitForSpaceEmptyRollover(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1707,9 +1707,9 @@ func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, return firstRetryReserved, release } -// TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not +// TestBrokerProducerWaitForSpaceExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. -func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { +func TestBrokerProducerWaitForSpaceExternalUnmute(t *testing.T) { config := NewTestConfig() txnMgr := &transactionManager{ producerID: 0, @@ -1796,7 +1796,7 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } -func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { +func TestBrokerProducerRunExternalUnmute(t *testing.T) { config := NewTestConfig() config.Producer.Flush.Messages = 1 parent := &asyncProducer{ @@ -1843,9 +1843,9 @@ func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { assertDoneWithin(t, done, 2*time.Second) } -// TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks +// TestBrokerProducerWaitForSpaceAllMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. -func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { +func TestBrokerProducerWaitForSpaceAllMuted(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1880,9 +1880,9 @@ func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { } } -// TestPartitionMuterCloseWakesWaitUntilMuted verifies that closing the muter wakes +// TestPartitionMuterCloseWakesWait verifies that closing the muter wakes // goroutines blocked in waitUntilMuted. -func TestPartitionMuterCloseWakesWaitUntilMuted(t *testing.T) { +func TestPartitionMuterCloseWakesWait(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -2031,7 +2031,7 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } -func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { +func TestHandleErrorRetryKeepsMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2092,7 +2092,7 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } -func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { +func TestHandleErrorRetryFailsBatch(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2141,7 +2141,7 @@ func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { parent.muter.unmute(contender) } -func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { +func TestHandleErrorRetryAsyncRefresh(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2219,7 +2219,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) } -func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { +func TestHandleErrorRetryRefreshOnce(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2288,7 +2288,7 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) } -func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { +func TestHandleErrorRetryGroupsByLeader(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2385,7 +2385,7 @@ func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { assertNotDone(t, outputB, 50*time.Millisecond) } -func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { +func TestHandleSuccessRetryUsesPartitionProducer(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2429,7 +2429,7 @@ func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { parent.muter.unmute(contender) } -func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { +func TestRetryBatchShutdownReleasesMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2542,11 +2542,9 @@ func TestRetryUsesSharedBufferBudget(t *testing.T) { retry := func(partition int32, pSet *partitionSet) { if tt.afterRefresh { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: partition, - pSet: pSet, - }}, ErrOutOfBrokers) + retrySet := newProduceSet(parent) + retrySet.addPartitionSet("topic", partition, pSet) + parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) return } parent.retryBatch("topic", partition, pSet, ErrNotEnoughReplicas, true) @@ -2600,7 +2598,7 @@ func TestRetryUsesSharedBufferBudget(t *testing.T) { } } -func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T) { +func TestRetryBatchesAfterRefreshBrokerDone(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2637,11 +2635,9 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T done := make(chan struct{}) go func() { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 0, - pSet: retryPartitionSet, - }}, ErrOutOfBrokers) + retrySet := newProduceSet(parent) + retrySet.addPartitionSet("topic", 0, retryPartitionSet) + parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) close(done) }() @@ -2658,7 +2654,70 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T parent.muter.unmute(contender) } -func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { +func TestRetryBatchesAfterRefreshShutdown(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 2), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + output := make(chan *produceSet, 1) + parent.brokers[leader] = &brokerProducer{ + parent: parent, + broker: leader, + output: output, + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + shutdownStarted := false + parent.client = &stubLeaderClient{ + cfg: config, + leaderFunc: func(string, int32) (*Broker, error) { + if !shutdownStarted { + close(parent.done) + shutdownStarted = true + } + return leader, nil + }, + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(2) + + parent.retryBatchesAfterRefresh(sent, ErrOutOfBrokers) + + firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) + secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, firstErr.Err) + require.Equal(t, ErrShuttingDown, secondErr.Err) + require.ElementsMatch(t, []int32{0, 1}, []int32{firstErr.Msg.Partition, secondErr.Msg.Partition}) + assertNotDone(t, output, 50*time.Millisecond) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next-0")}) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next-1")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mutes to be released after shutdown") + } + parent.muter.unmute(contender) +} + +func TestHandleErrorRetryLeaderError(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 From 45abfc7122cbfa0fa6b51f584b09af2c50e15660 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 22 Jun 2026 22:56:50 +0800 Subject: [PATCH 3/3] adjust 2 misleading log to debug level --- client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 71b57b15c..6dbf0d009 100644 --- a/client.go +++ b/client.go @@ -732,7 +732,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { for _, broker := range brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + DebugLogger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) } } @@ -741,7 +741,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { func (client *client) checkBrokersHealth() { for id, broker := range client.brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + DebugLogger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) delete(client.brokers, id) }