diff --git a/async_producer.go b/async_producer.go index 99821e5b4..29bc3a114 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/eapache/go-resiliency/breaker" @@ -92,6 +93,10 @@ type asyncProducer struct { errors chan *ProducerError input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup + // shutdownCh is closed before waiting on in-flight messages so retryBatch + // goroutines can stop waiting on broker handoff and release partition mutes. + shutdownCh chan struct{} + shutdownChClosed atomic.Bool brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int @@ -184,13 +189,6 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } -func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool { - m.mu.Lock() - defer m.mu.Unlock() - - return m.isMuted(topic, partition) -} - // waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. // Returns false if the muter was closed before all partitions could be muted. func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { @@ -310,6 +308,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), + shutdownCh: make(chan struct{}), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), txnmgr: txnmgr, @@ -383,11 +382,6 @@ type ProducerMessage struct { sequenceNumber int32 producerEpoch int16 hasSequence bool - // reusePartitionMute is a one-shot marker for non-idempotent retries. The - // first retried message from a failed, still-muted batch may reuse that - // existing in-flight reservation so later same-partition messages cannot - // slip in between the failed send and its retry. - reusePartitionMute bool } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -414,7 +408,6 @@ func (m *ProducerMessage) ByteSize(version int) int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 - m.reusePartitionMute = false m.sequenceNumber = 0 m.producerEpoch = 0 m.hasSequence = false @@ -794,12 +787,16 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan } func (pp *partitionProducer) backoff(retries int) { + pp.parent.backoff(retries) +} + +func (p *asyncProducer) backoff(retries int) { var backoff time.Duration - if pp.parent.conf.Producer.Retry.BackoffFunc != nil { - maxRetries := pp.parent.conf.Producer.Retry.Max - backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries) + if p.conf.Producer.Retry.BackoffFunc != nil { + maxRetries := p.conf.Producer.Retry.Max + backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries) } else { - backoff = pp.parent.conf.Producer.Retry.Backoff + backoff = p.conf.Producer.Retry.Backoff } if backoff > 0 { time.Sleep(backoff) @@ -969,6 +966,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + done: make(chan struct{}), accumulatingBatch: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -1082,6 +1080,11 @@ type brokerProducer struct { output chan<- *produceSet responses <-chan *brokerProducerResponse abandoned chan struct{} + // done is closed before output is closed. Direct retry handoff waits on + // output while the batch still owns its partition mute; if the brokerProducer + // is shutting down, that send may never be received and the mute would block + // later same-partition messages and producer shutdown. + done chan struct{} accumulatingBatch *produceSet flushingBatch *produceSet // batch that has been muted and is ready to send @@ -1214,28 +1217,13 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { return true } - reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool { - partitions := bp.accumulatingBatch.msgs[topic] - if partitions == nil { - return false - } - set := partitions[partition] - return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition) - }) - if reused == nil { - return false - } - reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - pSet.clearPartitionMuteReuse() - }) - bp.flushingBatch = reused - if bp.accumulatingBatch.empty() { - bp.rollOver() - } - return true + return false } func (bp *brokerProducer) shutdown() { + if bp.done != nil { + close(bp.done) + } // flush any ready buffer for bp.flushingBatch != nil { select { @@ -1393,7 +1381,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { + if bp.parent.conf.Producer.Idempotent { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } @@ -1436,7 +1424,9 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { - pSet.markPartitionMuteReusable() + // Non-idempotent response retries must go back through partitionProducer. + // That path advances the retry high watermark and refreshes the partition + // leader before sending the retry; retryBatch would bypass that state. bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1462,40 +1452,119 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet.msgs[topic][partition] = pSet produceSet.bufferBytes += pSet.bufferBytes produceSet.bufferCount += len(pSet.msgs) + muted := alreadyMuted + failBatch := func(err error) { + // Release the partition before reporting errors so a blocked Errors + // consumer cannot keep later same-partition messages muted. + if muted { + p.muter.unmute(produceSet) + muted = false + } + for _, msg := range pSet.msgs { + p.returnError(msg, err) + } + } for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { - p.returnErrors(pSet.msgs, retryErr) - if alreadyMuted { - p.muter.unmute(produceSet) - } + failBatch(retryErr) return } msg.retries++ } + // honor Producer.Retry.Backoff between retry attempts (#2469); the + // non-idempotent path gets this from partitionProducer.dispatch, but + // retryBatch dispatches the produceSet directly to the broker + if len(pSet.msgs) > 0 { + p.backoff(pSet.msgs[0].retries) + } + // it's expected that a metadata refresh has been requested prior to calling retryBatch leader, leaderErr := p.client.Leader(topic, partition) if leaderErr != nil { Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, leaderErr) - for _, msg := range pSet.msgs { - p.returnError(msg, retryErr) - } - if alreadyMuted { - p.muter.unmute(produceSet) - } + failBatch(retryErr) return } if !alreadyMuted { if !p.muter.waitUntilMuted(produceSet) { - for _, msg := range pSet.msgs { - p.returnError(msg, retryErr) - } + failBatch(retryErr) return } + muted = true } bp := p.getBrokerProducer(leader) - bp.output <- produceSet - p.unrefBrokerProducer(leader, bp) + defer p.unrefBrokerProducer(leader, bp) + if !p.sendRetryBatch(bp, produceSet) { + failBatch(ErrShuttingDown) + return + } +} + +type partitionBatchRetry struct { + topic string + partition int32 + pSet *partitionSet +} + +// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. +// Connection errors already hold partition mutes; doing the refresh in the +// response loop can block all progress for that broker while the cluster is +// unstable. Refresh once for the failed request so multi-partition batches do +// not amplify controller-fail metadata traffic, then retry partitions +// independently so one blocked broker handoff cannot hold up the rest. +func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { + if p.shutdownCh != nil { + select { + case <-p.shutdownCh: + for _, batch := range batches { + go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + } + return + default: + } + } + if len(topics) > 0 { + if err := p.client.RefreshMetadata(topics...); err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) + } + } + for _, batch := range batches { + go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + } +} + +// sendRetryBatch transfers ownership of a muted retry batch to the broker +// bridge. A false result means no owner accepted the batch, so the caller must +// release the mute and fail the messages instead of leaving a retry goroutine +// blocked while it still owns the partition mute. +func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool { + var done <-chan struct{} + if bp.done != nil { + done = bp.done + } + select { + case <-done: + return false + default: + } + select { + case bp.output <- set: + return true + default: + } + var shutdownCh <-chan struct{} + if p.shutdownCh != nil { + shutdownCh = p.shutdownCh + } + select { + case bp.output <- set: + return true + case <-done: + return false + case <-shutdownCh: + return false + } } func (bp *brokerProducer) handleError(sent *produceSet, err error) { @@ -1510,41 +1579,40 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err + keepMuted := make(map[string]map[int32]struct{}) var retryTopics []string retryTopicSeen := make(map[string]struct{}) + var retryBatches []partitionBatchRetry sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - if _, ok := retryTopicSeen[topic]; ok { - return - } - retryTopicSeen[topic] = struct{}{} - retryTopics = append(retryTopics, topic) - }) - if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 { - refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) - if refreshErr != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) - } - } - keepMuted := make(map[string]map[int32]struct{}) - sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - // keep partition marked as in-flight during retry (connection error) + // Connection failures have no ProduceResponse to feed back through + // partitionProducer. Keep the sent batch muted and retry it asynchronously + // so later same-partition batches cannot pass it, without blocking the + // brokerProducer response loop (#1203). if bp.currentRetries[topic] == nil { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) { + if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) { + bp.parent.returnErrors(pSet.msgs, err) + } else { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} - } - if bp.parent.conf.Producer.Idempotent { - go bp.parent.retryBatch(topic, partition, pSet, err, true) - } else { - pSet.markPartitionMuteReusable() - bp.parent.retryMessages(pSet.msgs, err) + if _, ok := retryTopicSeen[topic]; !ok { + retryTopicSeen[topic] = struct{}{} + retryTopics = append(retryTopics, topic) + } + retryBatches = append(retryBatches, partitionBatchRetry{ + topic: topic, + partition: partition, + pSet: pSet, + }) } }) + if len(retryBatches) > 0 { + go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err) + } bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) }) @@ -1626,6 +1694,10 @@ func (p *asyncProducer) retryHandler() { // utility functions func (p *asyncProducer) shutdown() { + Logger.Println("Producer shutting down.") + if p.shutdownCh != nil && p.shutdownChClosed.CompareAndSwap(false, true) { + close(p.shutdownCh) + } p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} diff --git a/async_producer_test.go b/async_producer_test.go index ed5e6beeb..4bb9cd87a 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1957,37 +1957,105 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } -func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { +func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 - txnMgr := &transactionManager{ - producerID: 0, - producerEpoch: 0, - sequenceNumbers: make(map[string]int32), + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + parent.client = &stubLeaderClient{leader: retryLeader, cfg: config} + + output := make(chan *produceSet) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") } + + done := make(chan struct{}) + go func() { + bp.handleError(sent, ErrOutOfBrokers) + close(done) + }() + assertDoneWithin(t, done, 2*time.Second) + + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + require.Equal(t, 1, retryPartitionSet.msgs[0].retries) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted by the retrying batch") + } +} + +func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + parent := &asyncProducer{ conf: config, muter: newPartitionMuter(), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), - retries: make(chan *ProducerMessage, 1), - txnmgr: txnMgr, + txnmgr: &transactionManager{}, } - leader := &Broker{id: 1} - parent.client = &stubLeaderClient{leader: leader, cfg: config} + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + client := &blockingRefreshClient{ + stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, + started: make(chan struct{}, 1), + release: make(chan struct{}), + } + parent.client = client + + output := make(chan *produceSet) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP bp := &brokerProducer{ parent: parent, - broker: leader, + broker: failedBroker, input: make(chan *ProducerMessage), accumulatingBatch: newProduceSet(parent), currentRetries: make(map[string]map[int32]error), } - parent.brokers[leader] = bp - parent.brokerRefs[bp] = 1 sent := newProduceSet(parent) safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) @@ -1995,29 +2063,242 @@ func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { if !parent.muter.tryMute(sent) { t.Fatal("expected sent batch to mute partitions") } - defer parent.muter.unmute(sent) + + done := make(chan struct{}) + go func() { + bp.handleError(sent, ErrOutOfBrokers) + close(done) + }() + assertDoneWithin(t, done, 2*time.Second) + assertDoneWithin(t, client.started, 2*time.Second) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected partition to remain muted while async metadata refresh is pending") + } + + close(client.release) + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) + require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) +} + +func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + retryLeader := &Broker{id: 2} + client := &countingRefreshClient{ + stubLeaderClient: stubLeaderClient{leader: retryLeader, cfg: config}, + calls: make(chan []string, 1), + } + parent.client = client + + output := make(chan *produceSet, 2) + retryBP := &brokerProducer{ + parent: parent, + broker: retryLeader, + output: output, + input: make(chan *ProducerMessage), + } + parent.brokers[retryLeader] = retryBP + parent.brokerRefs[retryBP] = 1 + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) + retryPartitionSet0 := sent.msgs["topic"][0] + retryPartitionSet1 := sent.msgs["topic"][1] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + bp.handleError(sent, ErrOutOfBrokers) + + require.Equal(t, []string{"topic"}, assertDoneWithin(t, client.calls, 2*time.Second)) + retrySet0 := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet0) + retrySet1 := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet1) + assertNotDone(t, client.calls, 50*time.Millisecond) + + retriedPartitions := make(map[int32]*partitionSet) + retrySet0.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retriedPartitions[partition] = pSet + }) + retrySet1.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retriedPartitions[partition] = pSet + }) + require.Same(t, retryPartitionSet0, retriedPartitions[int32(0)]) + require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) +} + +func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + retries: make(chan *ProducerMessage, 1), + txnmgr: &transactionManager{}, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} + safeAddMessage(t, sent, msg) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } response := new(ProduceResponse) response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) bp.handleSuccess(sent, response) retried := assertDoneWithin(t, parent.retries, 2*time.Second) - require.Equal(t, retryPartitionSet.msgs[0], retried) - require.True(t, retried.reusePartitionMute) + require.Equal(t, msg, retried) + require.Equal(t, 1, retried.retries) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if parent.muter.tryMute(contender) { - parent.muter.unmute(contender) - t.Fatal("expected partition to remain muted by the retrying batch") + if !parent.muter.tryMute(contender) { + t.Fatal("expected response retry to release sent mute before partitionProducer retry") } + parent.muter.unmute(contender) +} - safeAddMessage(t, bp.accumulatingBatch, retried) - if !bp.tryBuildFlushingBatch() { - t.Fatal("expected retry batch to reuse the existing partition mute") +func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + if parent.shutdownChClosed.CompareAndSwap(false, true) { + close(parent.shutdownCh) + } + + done := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + close(done) + }() + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, producerErr.Err) + assertDoneWithin(t, done, 2*time.Second) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after aborted retry handoff") + } + parent.muter.unmute(contender) +} + +func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + txnmgr: &transactionManager{}, + } + parent.client = &failingLeaderClient{ + stubLeaderClient: stubLeaderClient{cfg: config}, + err: ErrOutOfBrokers, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + + bp.handleError(sent, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrOutOfBrokers, producerErr.Err) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after retry leader lookup failure") } - require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0]) - require.False(t, retryPartitionSet.msgs[0].reusePartitionMute) + parent.muter.unmute(contender) } type stubLeaderClient struct { @@ -2055,6 +2336,44 @@ func (c *stubLeaderClient) PartitionNotReadable(string, int32) bool { r func (c *stubLeaderClient) Close() error { return nil } func (c *stubLeaderClient) Closed() bool { return false } +type failingLeaderClient struct { + stubLeaderClient + err error +} + +func (c *failingLeaderClient) Leader(string, int32) (*Broker, error) { + return nil, c.err +} + +func (c *failingLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, error) { + return nil, 0, c.err +} + +type blockingRefreshClient struct { + stubLeaderClient + started chan struct{} + release chan struct{} +} + +func (c *blockingRefreshClient) RefreshMetadata(...string) error { + select { + case c.started <- struct{}{}: + default: + } + <-c.release + return nil +} + +type countingRefreshClient struct { + stubLeaderClient + calls chan []string +} + +func (c *countingRefreshClient) RefreshMetadata(topics ...string) error { + c.calls <- append([]string(nil), topics...) + return nil +} + func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/produce_set.go b/produce_set.go index a86235b9e..121a272ac 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,40 +12,18 @@ type partitionSet struct { bufferBytes int } -// markPartitionMuteReusable lets the first message carry the retry batch's -// right to reuse the partition mute that is still held by its failed send. -// Only the first message is marked because one held mute represents one -// in-flight retry permission; marking every message could let split retry -// fragments bypass each other under the same reservation. -func (ps *partitionSet) markPartitionMuteReusable() { - if len(ps.msgs) > 0 { - ps.msgs[0].reusePartitionMute = true - } -} - -// canReusePartitionMute reports whether this retry batch may be flushed while -// its partition is already muted by the previous send attempt. -func (ps *partitionSet) canReusePartitionMute() bool { - return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute -} - -// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry -// batch is selected for flushing. -func (ps *partitionSet) clearPartitionMuteReuse() { - for _, msg := range ps.msgs { - msg.reusePartitionMute = false +// shouldKeepMuted matches retryBatch's whole-batch retry rule: if any message has +// exhausted retries, the batch will be failed instead of retried. +func (ps *partitionSet) shouldKeepMuted(maxRetries int) bool { + if len(ps.msgs) == 0 { + return false } -} - -// canRetry prevents holding a partition mute when every message in the set will -// be returned as an error instead of being retried. -func (ps *partitionSet) canRetry(maxRetries int) bool { for _, msg := range ps.msgs { - if msg.retries < maxRetries { - return true + if msg.retries >= maxRetries { + return false } } - return false + return true } type produceSet struct { diff --git a/produce_set_test.go b/produce_set_test.go index 9ab964949..359bfd47b 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -25,6 +25,24 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } } +func TestShouldKeepMuted(t *testing.T) { + if (&partitionSet{}).shouldKeepMuted(1) { + t.Fatal("empty partition set should not be retryable") + } + if !(&partitionSet{msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 0}, + }}).shouldKeepMuted(1) { + t.Fatal("expected batch to be retryable when every message is below retry max") + } + if (&partitionSet{msgs: []*ProducerMessage{ + {retries: 0}, + {retries: 1}, + }}).shouldKeepMuted(1) { + t.Fatal("expected batch not to be retryable when any message has exhausted retries") + } +} + func TestProduceSetInitial(t *testing.T) { _, ps := makeProduceSet()