diff --git a/async_producer.go b/async_producer.go index 29bc3a114..d6f02bd45 100644 --- a/async_producer.go +++ b/async_producer.go @@ -14,7 +14,7 @@ import ( "github.com/rcrowley/go-metrics" ) -// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. +// ErrProducerRetryBufferOverflow is returned when producer retry buffering is full and OOM prevention needs to be applied. var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") const ( @@ -102,6 +102,8 @@ type asyncProducer struct { brokerRefs map[*brokerProducer]int brokerLock sync.Mutex + retryBuffer retryBufferQuota + txnmgr *transactionManager txLock sync.Mutex @@ -112,6 +114,14 @@ type asyncProducer struct { metricsRegistry metrics.Registry } +type retryBufferQuota struct { + // messages and bytes track producer-level retry buffer occupancy when + // Producer.Retry.MaxBufferLength or MaxBufferBytes is bounded. + messages int64 + bytes int64 + mu sync.Mutex +} + type partitionMuter struct { mu sync.Mutex cond *sync.Cond @@ -791,6 +801,13 @@ func (pp *partitionProducer) backoff(retries int) { } func (p *asyncProducer) backoff(retries int) { + backoff := p.retryBackoff(retries) + if backoff > 0 { + time.Sleep(backoff) + } +} + +func (p *asyncProducer) retryBackoff(retries int) time.Duration { var backoff time.Duration if p.conf.Producer.Retry.BackoffFunc != nil { maxRetries := p.conf.Producer.Retry.Max @@ -798,8 +815,37 @@ func (p *asyncProducer) backoff(retries int) { } else { backoff = p.conf.Producer.Retry.Backoff } - if backoff > 0 { - time.Sleep(backoff) + return backoff +} + +func (p *asyncProducer) shuttingDown() bool { + if p.shutdownCh == nil { + return false + } + select { + case <-p.shutdownCh: + return true + default: + return false + } +} + +func (p *asyncProducer) backoffOrDone(retries int) bool { + backoff := p.retryBackoff(retries) + if backoff <= 0 { + return !p.shuttingDown() + } + timer := time.NewTimer(backoff) + defer timer.Stop() + if p.shutdownCh == nil { + <-timer.C + return true + } + select { + case <-timer.C: + return true + case <-p.shutdownCh: + return false } } @@ -1448,12 +1494,18 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, retryErr error, alreadyMuted bool) { Logger.Printf("Retrying batch for %v-%d because of %v\n", topic, partition, retryErr) produceSet := newProduceSet(p) - produceSet.msgs[topic] = make(map[int32]*partitionSet) - produceSet.msgs[topic][partition] = pSet - produceSet.bufferBytes += pSet.bufferBytes - produceSet.bufferCount += len(pSet.msgs) + produceSet.addPartitionSet(topic, partition, pSet) + bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) + bufferReserved := false + releaseBuffer := func() { + if bufferReserved { + p.releaseBuffer(bufferMessages, bufferBytes) + bufferReserved = false + } + } muted := alreadyMuted failBatch := func(err error) { + releaseBuffer() // Release the partition before reporting errors so a blocked Errors // consumer cannot keep later same-partition messages muted. if muted { @@ -1471,12 +1523,17 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } msg.retries++ } + if !p.reserveBuffer(bufferMessages, bufferBytes) { + failBatch(ErrProducerRetryBufferOverflow) + return + } + bufferReserved = p.retryBufferBounded() && (bufferMessages != 0 || bufferBytes != 0) - // honor Producer.Retry.Backoff between retry attempts (#2469); the - // non-idempotent path gets this from partitionProducer.dispatch, but - // retryBatch dispatches the produceSet directly to the broker - if len(pSet.msgs) > 0 { - p.backoff(pSet.msgs[0].retries) + // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch + // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. + if len(pSet.msgs) > 0 && !p.backoffOrDone(pSet.msgs[0].retries) { + failBatch(ErrShuttingDown) + return } // it's expected that a metadata refresh has been requested prior to calling retryBatch @@ -1499,6 +1556,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio failBatch(ErrShuttingDown) return } + releaseBuffer() } type partitionBatchRetry struct { @@ -1507,30 +1565,213 @@ type partitionBatchRetry struct { pSet *partitionSet } +func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) { + produceSet := newProduceSet(p) + produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet) + p.muter.unmute(produceSet) + for _, msg := range batch.pSet.msgs { + p.returnError(msg, err) + } +} + +func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) { + for _, batch := range batches { + p.failMutedBatch(batch, err) + } +} + +func (p *asyncProducer) retryBufferLimits() (int64, int64) { + maxBufferLength := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { + maxBufferLength = minFunctionalRetryBufferLength + } + + maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes + if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { + maxBufferBytes = minFunctionalRetryBufferBytes + } + + return int64(maxBufferLength), maxBufferBytes +} + +func (p *asyncProducer) retryBufferBounded() bool { + maxBufferLength, maxBufferBytes := p.retryBufferLimits() + return maxBufferLength > 0 || maxBufferBytes > 0 +} + +func (p *asyncProducer) producerMessageByteSizeVersion() int { + if p.conf.Version.IsAtLeast(V0_11_0_0) { + return 2 + } + return 1 +} + +func (p *asyncProducer) bufferWouldOverflow(messages, bytes int64) bool { + maxBufferLength, maxBufferBytes := p.retryBufferLimits() + return (maxBufferLength > 0 && messages >= maxBufferLength) || + (maxBufferBytes > 0 && bytes >= maxBufferBytes) +} + +func (p *asyncProducer) reserveBuffer(messages, bytes int64) bool { + if messages == 0 && bytes == 0 { + return true + } + if !p.retryBufferBounded() { + return true + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + if p.bufferWouldOverflow(p.retryBuffer.messages+messages, p.retryBuffer.bytes+bytes) { + return false + } + p.retryBuffer.messages += messages + p.retryBuffer.bytes += bytes + return true +} + +func (p *asyncProducer) addToBuffer(messages, bytes int64) bool { + if messages == 0 && bytes == 0 { + return false + } + if !p.retryBufferBounded() { + return false + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + p.retryBuffer.messages += messages + p.retryBuffer.bytes += bytes + return p.bufferWouldOverflow(p.retryBuffer.messages, p.retryBuffer.bytes) +} + +func (p *asyncProducer) releaseBuffer(messages, bytes int64) { + if messages == 0 && bytes == 0 { + return + } + if !p.retryBufferBounded() { + return + } + p.retryBuffer.mu.Lock() + defer p.retryBuffer.mu.Unlock() + + p.retryBuffer.messages -= messages + p.retryBuffer.bytes -= bytes +} + // retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. // Connection errors already hold partition mutes; doing the refresh in the // response loop can block all progress for that broker while the cluster is // unstable. Refresh once for the failed request so multi-partition batches do -// not amplify controller-fail metadata traffic, then retry partitions -// independently so one blocked broker handoff cannot hold up the rest. +// not amplify controller-fail metadata traffic. Group retry partitions by their +// refreshed leader broker, but hand them off from this retry worker so large +// clusters do not create one temporary goroutine per target broker. +// +// This is intentionally not implemented as a loop over retryBatch. retryBatch +// is the single-partition direct retry primitive; this path owns all muted +// partitions from one failed ProduceRequest and must retry, fail, reserve +// buffer, back off, and refresh metadata at that failed-request boundary. func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { - if p.shutdownCh != nil { - select { - case <-p.shutdownCh: - for _, batch := range batches { - go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + if p.shuttingDown() { + p.failMutedBatches(batches, ErrShuttingDown) + return + } + + var retryable []partitionBatchRetry + maxRetryAttempt := 0 + for _, batch := range batches { + if !batch.pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { + p.failMutedBatch(batch, retryErr) + continue + } + for _, msg := range batch.pSet.msgs { + msg.retries++ + if msg.retries > maxRetryAttempt { + maxRetryAttempt = msg.retries } - return - default: } + retryable = append(retryable, batch) + } + if len(retryable) == 0 { + return + } + + // Reserve before metadata refresh so refresh-blocked direct retries still + // count against the producer-level retry buffer budget. + var bufferMessages, bufferBytes int64 + for _, batch := range retryable { + bufferMessages += int64(len(batch.pSet.msgs)) + bufferBytes += int64(batch.pSet.bufferBytes) + } + if !p.reserveBuffer(bufferMessages, bufferBytes) { + p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) + return + } + + if p.shuttingDown() { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return } if len(topics) > 0 { if err := p.client.RefreshMetadata(topics...); err != nil { Logger.Printf("Failed refreshing metadata because of %v\n", err) } } - for _, batch := range batches { - go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true) + if p.shuttingDown() { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return + } + + if !p.backoffOrDone(maxRetryAttempt) { + p.releaseBuffer(bufferMessages, bufferBytes) + p.failMutedBatches(retryable, ErrShuttingDown) + return + } + + brokerSets := make(map[*Broker]*produceSet) + for _, batch := range retryable { + if p.shuttingDown() { + batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) + p.releaseBuffer(batchMessages, batchBytes) + p.failMutedBatch(batch, ErrShuttingDown) + continue + } + leader, leaderErr := p.client.Leader(batch.topic, batch.partition) + if leaderErr != nil { + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, leaderErr) + batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) + p.releaseBuffer(batchMessages, batchBytes) + p.failMutedBatch(batch, retryErr) + continue + } + set := brokerSets[leader] + if set == nil { + set = newProduceSet(p) + brokerSets[leader] = set + } + set.addPartitionSet(batch.topic, batch.partition, batch.pSet) + } + + for leader, set := range brokerSets { + p.handoffRetrySet(leader, set) + } +} + +func (p *asyncProducer) handoffRetrySet(leader *Broker, set *produceSet) { + bp := p.getBrokerProducer(leader) + accepted := p.sendRetryBatch(bp, set) + p.unrefBrokerProducer(leader, bp) + setMessages, setBytes := int64(set.bufferCount), int64(set.bufferBytes) + p.releaseBuffer(setMessages, setBytes) + if !accepted { + p.muter.unmute(set) + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + for _, msg := range pSet.msgs { + p.returnError(msg, ErrShuttingDown) + } + }) } } @@ -1633,22 +1874,8 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - maxBufferLength := p.conf.Producer.Retry.MaxBufferLength - if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { - maxBufferLength = minFunctionalRetryBufferLength - } - - maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes - if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { - maxBufferBytes = minFunctionalRetryBufferBytes - } - - version := 1 - if p.conf.Version.IsAtLeast(V0_11_0_0) { - version = 2 - } + version := p.producerMessageByteSizeVersion() - var currentByteSize int64 var msg *ProducerMessage buf := queue.New() @@ -1660,7 +1887,7 @@ func (p *asyncProducer) retryHandler() { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): msgToRemove := buf.Remove().(*ProducerMessage) - currentByteSize -= int64(msgToRemove.ByteSize(version)) + p.releaseBuffer(1, int64(msgToRemove.ByteSize(version))) continue } } @@ -1670,9 +1897,9 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) - currentByteSize += int64(msg.ByteSize(version)) + bufferOverflow := p.addToBuffer(1, int64(msg.ByteSize(version))) - if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) { + if !bufferOverflow { continue } @@ -1681,10 +1908,10 @@ func (p *asyncProducer) retryHandler() { select { case p.input <- msgToHandle: buf.Remove() - currentByteSize -= int64(msgToHandle.ByteSize(version)) + p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) default: buf.Remove() - currentByteSize -= int64(msgToHandle.ByteSize(version)) + p.releaseBuffer(1, int64(msgToHandle.ByteSize(version))) p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) } } diff --git a/async_producer_test.go b/async_producer_test.go index 4bb9cd87a..a3c220f63 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1635,6 +1635,51 @@ func assertDoneWithin[T any](t *testing.T, ch <-chan T, timeout time.Duration) T } } +func waitForProducerBuffer(t *testing.T, parent *asyncProducer, messages, bytes int64) { + t.Helper() + deadline := time.After(2 * time.Second) + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + for { + parent.retryBuffer.mu.Lock() + gotMessages := parent.retryBuffer.messages + gotBytes := parent.retryBuffer.bytes + parent.retryBuffer.mu.Unlock() + if (messages < 0 || gotMessages == messages) && (bytes < 0 || gotBytes == bytes) { + return + } + select { + case <-deadline: + t.Fatalf("timed out waiting for buffer budget messages=%d bytes=%d, got messages=%d bytes=%d", + messages, bytes, gotMessages, gotBytes) + case <-ticker.C: + } + } +} + +func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, *brokerProducer) { + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, errorBuffer), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + return parent, retryBP +} + // TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { @@ -1907,6 +1952,7 @@ func TestBrokerProducerRollOverClearsTimer(t *testing.T) { func TestRetryBatchRespectsPartitionMuter(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength txnMgr := &transactionManager{ producerID: 0, producerEpoch: 0, @@ -1949,6 +1995,7 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { default: t.Fatal("expected retry batch to be dispatched") } + waitForProducerBuffer(t, parent, 0, 0) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2018,11 +2065,61 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } +func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + errors: make(chan *ProducerError, 2), + retries: make(chan *ProducerMessage, 2), + txnmgr: &transactionManager{}, + } + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + exhausted := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("exhausted"), retries: 1} + retryable := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retryable")} + sent := newProduceSet(parent) + safeAddMessage(t, sent, exhausted) + safeAddMessage(t, sent, retryable) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(2) + + bp.handleError(sent, ErrOutOfBrokers) + + firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) + secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, exhausted, firstErr.Msg) + require.Equal(t, ErrOutOfBrokers, firstErr.Err) + require.Equal(t, retryable, secondErr.Msg) + require.Equal(t, ErrOutOfBrokers, secondErr.Err) + assertNotDone(t, parent.retries, 50*time.Millisecond) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after whole-batch failure") + } + parent.muter.unmute(contender) +} + func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ conf: config, @@ -2071,6 +2168,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T }() assertDoneWithin(t, done, 2*time.Second) assertDoneWithin(t, client.started, 2*time.Second) + waitForProducerBuffer(t, parent, 1, -1) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) @@ -2083,6 +2181,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T retrySet := assertDoneWithin(t, output, 2*time.Second) defer parent.muter.unmute(retrySet) require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) + waitForProducerBuffer(t, parent, 0, 0) } func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { @@ -2136,23 +2235,112 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t bp.handleError(sent, ErrOutOfBrokers) require.Equal(t, []string{"topic"}, assertDoneWithin(t, client.calls, 2*time.Second)) - retrySet0 := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet0) - retrySet1 := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet1) + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) assertNotDone(t, client.calls, 50*time.Millisecond) + assertNotDone(t, output, 50*time.Millisecond) retriedPartitions := make(map[int32]*partitionSet) - retrySet0.eachPartition(func(_ string, partition int32, pSet *partitionSet) { - retriedPartitions[partition] = pSet - }) - retrySet1.eachPartition(func(_ string, partition int32, pSet *partitionSet) { + retrySet.eachPartition(func(_ string, partition int32, pSet *partitionSet) { retriedPartitions[partition] = pSet }) require.Same(t, retryPartitionSet0, retriedPartitions[int32(0)]) require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) } +func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: &transactionManager{}, + } + failedBroker := &Broker{id: 1} + leaderA := &Broker{id: 2} + leaderB := &Broker{id: 3} + client := &routingLeaderClient{ + stubLeaderClient: stubLeaderClient{cfg: config}, + leaders: map[string]map[int32]*Broker{ + "topic-a": { + 0: leaderA, + 1: leaderA, + }, + "topic-b": { + 0: leaderB, + 1: leaderB, + }, + }, + calls: make(chan []string, 1), + } + parent.client = client + + outputA := make(chan *produceSet, 1) + outputB := make(chan *produceSet, 1) + retryBPA := &brokerProducer{ + parent: parent, + broker: leaderA, + output: outputA, + input: make(chan *ProducerMessage), + } + retryBPB := &brokerProducer{ + parent: parent, + broker: leaderB, + output: outputB, + input: make(chan *ProducerMessage), + } + parent.brokers[leaderA] = retryBPA + parent.brokers[leaderB] = retryBPB + + bp := &brokerProducer{ + parent: parent, + broker: failedBroker, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 0, Value: StringEncoder("a-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 1, Value: StringEncoder("a-1")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 0, Value: StringEncoder("b-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 1, Value: StringEncoder("b-1")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + + bp.handleError(sent, ErrOutOfBrokers) + + require.ElementsMatch(t, []string{"topic-a", "topic-b"}, assertDoneWithin(t, client.calls, 2*time.Second)) + retrySetA := assertDoneWithin(t, outputA, 2*time.Second) + defer parent.muter.unmute(retrySetA) + retrySetB := assertDoneWithin(t, outputB, 2*time.Second) + defer parent.muter.unmute(retrySetB) + + require.Len(t, retrySetA.msgs, 1) + require.Len(t, retrySetA.msgs["topic-a"], 2) + require.Len(t, retrySetB.msgs, 1) + require.Len(t, retrySetB.msgs["topic-b"], 2) + for _, partitions := range retrySetA.msgs { + for _, pSet := range partitions { + require.Equal(t, 1, pSet.msgs[0].retries) + } + } + for _, partitions := range retrySetB.msgs { + for _, pSet := range partitions { + require.Equal(t, 1, pSet.msgs[0].retries) + } + } + + assertNotDone(t, outputA, 50*time.Millisecond) + assertNotDone(t, outputB, 50*time.Millisecond) +} + func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2203,6 +2391,7 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength parent := &asyncProducer{ conf: config, @@ -2231,16 +2420,187 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { t.Fatal("expected sent batch to mute partitions") } parent.inFlight.Add(1) + + done := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + close(done) + }() + waitForProducerBuffer(t, parent, 1, -1) + if parent.shutdownChClosed.CompareAndSwap(false, true) { close(parent.shutdownCh) } + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, producerErr.Err) + assertDoneWithin(t, done, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mute to be released after aborted retry handoff") + } + parent.muter.unmute(contender) +} + +func TestRetryBatchUsesSharedBufferLengthBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = true + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 + + parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) + + first := newProduceSet(parent) + for i := 0; i < minFunctionalRetryBufferLength; i++ { + safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + } + firstPartitionSet := first.msgs["topic"][0] + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(minFunctionalRetryBufferLength) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) + close(firstDone) + }() + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + second := newProduceSet(parent) + safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) + secondPartitionSet := second.msgs["topic"][1] + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchUsesSharedBufferBytesBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = true + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent, retryBP := newBlockingRetryProducer(config, 2) + + firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} + secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} + version := parent.producerMessageByteSizeVersion() + firstBytes := int64(firstMsg.ByteSize(version)) + secondBytes := int64(secondMsg.ByteSize(version)) + config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + + first := newProduceSet(parent) + safeAddMessage(t, first, firstMsg) + firstPartitionSet := first.msgs["topic"][0] + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(1) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatch("topic", 0, firstPartitionSet, ErrNotEnoughReplicas, true) + close(firstDone) + }() + waitForProducerBuffer(t, parent, -1, firstBytes) + + second := newProduceSet(parent) + safeAddMessage(t, second, secondMsg) + secondPartitionSet := second.msgs["topic"][1] + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatch("topic", 1, secondPartitionSet, ErrNotEnoughReplicas, true) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, -1, firstBytes) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 1), + shutdownCh: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + parent.client = &stubLeaderClient{leader: leader, cfg: config} + retryBP := &brokerProducer{ + parent: parent, + broker: leader, + output: make(chan *produceSet), + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + parent.brokers[leader] = retryBP + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + retryPartitionSet := sent.msgs["topic"][0] + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(1) + done := make(chan struct{}) go func() { - parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: retryPartitionSet, + }}, ErrOutOfBrokers) close(done) }() + close(retryBP.done) producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) require.Equal(t, ErrShuttingDown, producerErr.Err) assertDoneWithin(t, done, 2*time.Second) @@ -2248,9 +2608,131 @@ func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after aborted retry handoff") + t.Fatal("expected partition mute to be released after brokerProducer done") + } + parent.muter.unmute(contender) +} + +func TestRetryBatchesAfterRefreshUsesSharedBufferLengthBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 + + parent, retryBP := newBlockingRetryProducer(config, minFunctionalRetryBufferLength+1) + + first := newProduceSet(parent) + for i := 0; i < minFunctionalRetryBufferLength; i++ { + safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + } + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(minFunctionalRetryBufferLength) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: first.msgs["topic"][0], + }}, ErrOutOfBrokers) + close(firstDone) + }() + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + second := newProduceSet(parent) + safeAddMessage(t, second, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")}) + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 1, + pSet: second.msgs["topic"][1], + }}, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, minFunctionalRetryBufferLength, -1) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") } parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) +} + +func TestRetryBatchesAfterRefreshUsesSharedBufferBytesBudget(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent, retryBP := newBlockingRetryProducer(config, 2) + + firstMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} + secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} + version := parent.producerMessageByteSizeVersion() + firstBytes := int64(firstMsg.ByteSize(version)) + secondBytes := int64(secondMsg.ByteSize(version)) + config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes + + first := newProduceSet(parent) + safeAddMessage(t, first, firstMsg) + if !parent.muter.tryMute(first) { + t.Fatal("expected first retry batch to mute partitions") + } + parent.inFlight.Add(1) + + firstDone := make(chan struct{}) + go func() { + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 0, + pSet: first.msgs["topic"][0], + }}, ErrOutOfBrokers) + close(firstDone) + }() + waitForProducerBuffer(t, parent, -1, firstBytes) + + second := newProduceSet(parent) + safeAddMessage(t, second, secondMsg) + if !parent.muter.tryMute(second) { + t.Fatal("expected second retry batch to mute partitions") + } + parent.inFlight.Add(1) + parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ + topic: "topic", + partition: 1, + pSet: second.msgs["topic"][1], + }}, ErrOutOfBrokers) + + producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) + require.Equal(t, int32(1), producerErr.Msg.Partition) + waitForProducerBuffer(t, parent, -1, firstBytes) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected overflowed retry batch to release partition mute") + } + parent.muter.unmute(contender) + + close(retryBP.done) + assertDoneWithin(t, firstDone, 2*time.Second) + waitForProducerBuffer(t, parent, 0, 0) } func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { @@ -2374,6 +2856,31 @@ func (c *countingRefreshClient) RefreshMetadata(topics ...string) error { return nil } +type routingLeaderClient struct { + stubLeaderClient + leaders map[string]map[int32]*Broker + calls chan []string +} + +func (c *routingLeaderClient) Leader(topic string, partition int32) (*Broker, error) { + if partitions := c.leaders[topic]; partitions != nil { + if leader := partitions[partition]; leader != nil { + return leader, nil + } + } + return nil, ErrLeaderNotAvailable +} + +func (c *routingLeaderClient) LeaderAndEpoch(topic string, partition int32) (*Broker, int32, error) { + leader, err := c.Leader(topic, partition) + return leader, 0, err +} + +func (c *routingLeaderClient) RefreshMetadata(topics ...string) error { + c.calls <- append([]string(nil), topics...) + return nil +} + func testProducerInterceptor( t *testing.T, interceptors []ProducerInterceptor, diff --git a/config.go b/config.go index 8b87a6cd2..9e837bbc8 100644 --- a/config.go +++ b/config.go @@ -276,16 +276,20 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration - // The maximum length of the bridging buffer between `input` and `retries` channels - // in AsyncProducer#retryHandler. - // The limit is to prevent this buffer from overflowing or causing OOM. + // The maximum number of messages in producer retry buffering, including + // the bridging buffer between `input` and `retries` channels in + // AsyncProducer#retryHandler and direct retries waiting for metadata + // refresh, retry backoff, leader lookup, partition mute, or broker handoff. + // The limit is to prevent retry buffering from overflowing or causing OOM. // Defaults to 0 for unlimited. // Any value between 0 and 4096 is pushed to 4096. // A zero or negative value indicates unlimited. MaxBufferLength int - // The maximum total byte size of messages in the bridging buffer between `input` - // and `retries` channels in AsyncProducer#retryHandler. - // This limit prevents the buffer from consuming excessive memory. + // The maximum total byte size of messages in producer retry buffering, + // including the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler and direct retries waiting for metadata + // refresh, retry backoff, leader lookup, partition mute, or broker handoff. + // This limit prevents retry buffering from consuming excessive memory. // Defaults to 0 for unlimited. // Any value between 0 and 32 MB is pushed to 32 MB. // A zero or negative value indicates unlimited. diff --git a/functional_producer_test.go b/functional_producer_test.go index df8a5d0ca..bbb0e27dc 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -430,8 +430,8 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { handler.started.Add(4) go func() { - err = cg.Consume(ctx, []string{"test.4"}, handler) - require.NoError(t, err) + consumeErr := cg.Consume(ctx, []string{"test.4"}, handler) + require.NoError(t, consumeErr) }() handler.started.Wait() diff --git a/produce_set.go b/produce_set.go index 121a272ac..0fbb7097a 100644 --- a/produce_set.go +++ b/produce_set.go @@ -50,6 +50,15 @@ func newProduceSetWithMeta(parent *asyncProducer, producerID int64, producerEpoc } } +func (ps *produceSet) addPartitionSet(topic string, partition int32, set *partitionSet) { + if ps.msgs[topic] == nil { + ps.msgs[topic] = make(map[int32]*partitionSet) + } + ps.msgs[topic][partition] = set + ps.bufferBytes += set.bufferBytes + ps.bufferCount += len(set.msgs) +} + func (ps *produceSet) add(msg *ProducerMessage) error { var err error var key, val []byte