diff --git a/async_producer.go b/async_producer.go index bcd7382e7..a9af20b1b 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "sync" - "sync/atomic" "time" "github.com/eapache/go-resiliency/breaker" @@ -14,7 +13,7 @@ import ( "github.com/rcrowley/go-metrics" ) -// ErrProducerRetryBufferOverflow is returned when producer retry buffering is full and OOM prevention needs to be applied. +// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied. var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow") const ( @@ -94,11 +93,6 @@ type asyncProducer struct { input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup - // done is closed before waiting on in-flight messages so retryBatch - // goroutines can stop waiting on broker handoff and release partition mutes. - done chan struct{} - closed atomic.Bool - brokers map[*Broker]*brokerProducer brokerRefs map[*brokerProducer]int brokerLock sync.Mutex @@ -110,10 +104,6 @@ type asyncProducer struct { // mirroring Kafka's RecordAccumulator. muter *partitionMuter - // retryBufferQuota tracks producer-level retry buffer occupancy when - // Producer.Retry.MaxBufferLength or MaxBufferBytes is bounded. - retryBufferQuota messageQuota - metricsRegistry metrics.Registry } @@ -307,19 +297,17 @@ func newAsyncProducer(client Client) (AsyncProducer, error) { } p := &asyncProducer{ - client: client, - conf: client.Config(), - errors: make(chan *ProducerError), - input: make(chan *ProducerMessage), - successes: make(chan *ProducerMessage), - retries: make(chan *ProducerMessage), - done: make(chan struct{}), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: txnmgr, - muter: newPartitionMuter(), - retryBufferQuota: newRetryBufferQuota(client.Config()), - metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry), + client: client, + conf: client.Config(), + errors: make(chan *ProducerError), + input: make(chan *ProducerMessage), + successes: make(chan *ProducerMessage), + retries: make(chan *ProducerMessage), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: txnmgr, + muter: newPartitionMuter(), + metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry), } // launch our singleton dispatchers @@ -793,35 +781,15 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan } func (pp *partitionProducer) backoff(retries int) { - time.Sleep(pp.parent.calcBackoff(retries)) -} - -func (p *asyncProducer) calcBackoff(retries int) time.Duration { - if backoffFunc := p.conf.Producer.Retry.BackoffFunc; backoffFunc != nil { - return backoffFunc(retries, p.conf.Producer.Retry.Max) - } - return p.conf.Producer.Retry.Backoff -} - -func (p *asyncProducer) shuttingDown() bool { - select { - case <-p.done: - return true - default: - return false + var backoff time.Duration + if pp.parent.conf.Producer.Retry.BackoffFunc != nil { + maxRetries := pp.parent.conf.Producer.Retry.Max + backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries) + } else { + backoff = pp.parent.conf.Producer.Retry.Backoff } -} - -func (p *asyncProducer) backoffOrDone(retries int) bool { - backoff := p.calcBackoff(retries) - if backoff <= 0 { - return !p.shuttingDown() - } - select { - case <-time.After(backoff): - return true - case <-p.done: - return false + if backoff > 0 { + time.Sleep(backoff) } } @@ -988,7 +956,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, - done: make(chan struct{}), accumulatingBatch: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -1102,12 +1069,6 @@ type brokerProducer struct { output chan<- *produceSet responses <-chan *brokerProducerResponse abandoned chan struct{} - // done is closed before output is closed. Direct retry handoff waits on - // output while the batch still owns its partition mute; if the brokerProducer - // is shutting down, that send may never be received and the mute would block - // later same-partition messages and producer shutdown. - done chan struct{} - closed atomic.Bool accumulatingBatch *produceSet flushingBatch *produceSet // batch that has been muted and is ready to send @@ -1122,14 +1083,8 @@ func (bp *brokerProducer) run() { var output chan<- *produceSet for { - var unmuteCh <-chan struct{} if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { bp.tryBuildFlushingBatch() - if bp.flushingBatch == nil { - if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { - unmuteCh = ch - } - } } var timerChan <-chan time.Time @@ -1183,6 +1138,7 @@ func (bp *brokerProducer) run() { } if bp.accumulatingBatch.wouldOverflow(msg) { + DebugLogger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) if err := bp.waitForSpace(msg, false); err != nil { bp.parent.retryMessage(msg, err) continue @@ -1213,7 +1169,6 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } - case <-unmuteCh: } } } @@ -1242,12 +1197,6 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { } func (bp *brokerProducer) shutdown() { - if !bp.closed.CompareAndSwap(false, true) { - return - } - if bp.done != nil { - close(bp.done) - } // flush any ready buffer for bp.flushingBatch != nil { select { @@ -1448,9 +1397,6 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo if bp.parent.conf.Producer.Idempotent { go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) } else { - // Non-idempotent response retries must go back through partitionProducer. - // That path advances the retry high watermark and refreshes the partition - // leader before sending the retry; retryBatch would bypass that state. bp.parent.retryMessages(pSet.msgs, block.Err) } // dropping the following messages has the side effect of incrementing their retry count @@ -1472,209 +1418,44 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, retryErr error, alreadyMuted bool) { Logger.Printf("Retrying batch for %v-%d because of %v\n", topic, partition, retryErr) produceSet := newProduceSet(p) - produceSet.addPartitionSet(topic, partition, pSet) - bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) - - muted := alreadyMuted - failBatch := func(err error) { - // Release the partition before reporting errors so a blocked Errors - // consumer cannot keep later same-partition messages muted. - if muted { - p.muter.unmute(produceSet) - muted = false - } - p.returnErrors(pSet.msgs, err) - } + produceSet.msgs[topic] = make(map[int32]*partitionSet) + produceSet.msgs[topic][partition] = pSet + produceSet.bufferBytes += pSet.bufferBytes + produceSet.bufferCount += len(pSet.msgs) for _, msg := range pSet.msgs { if msg.retries >= p.conf.Producer.Retry.Max { - failBatch(retryErr) + p.returnErrors(pSet.msgs, retryErr) + if alreadyMuted { + p.muter.unmute(produceSet) + } return } msg.retries++ } - if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { - failBatch(ErrProducerRetryBufferOverflow) - return - } - defer p.retryBufferQuota.release(bufferMessages, bufferBytes) - - // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch - // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. - if len(pSet.msgs) > 0 && !p.backoffOrDone(pSet.msgs[0].retries) { - failBatch(ErrShuttingDown) - return - } // it's expected that a metadata refresh has been requested prior to calling retryBatch leader, leaderErr := p.client.Leader(topic, partition) if leaderErr != nil { Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, leaderErr) - failBatch(retryErr) + for _, msg := range pSet.msgs { + p.returnError(msg, retryErr) + } + if alreadyMuted { + p.muter.unmute(produceSet) + } return } if !alreadyMuted { if !p.muter.waitUntilMuted(produceSet) { - failBatch(retryErr) - return - } - muted = true - } - bp := p.getBrokerProducer(leader) - defer p.unrefBrokerProducer(leader, bp) - if !p.handoffRetryBatch(bp, produceSet) { - failBatch(ErrShuttingDown) - } -} - -func (p *asyncProducer) failMutedSet(set *produceSet, err error) { - p.muter.unmute(set) - set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - p.returnErrors(pSet.msgs, err) - }) -} - -func newRetryBufferQuota(conf *Config) messageQuota { - maxBufferLength, maxBufferBytes := retryBufferLimits(conf) - return newMessageQuota(maxBufferLength, maxBufferBytes) -} - -func retryBufferLimits(conf *Config) (int64, int64) { - maxBufferLength := conf.Producer.Retry.MaxBufferLength - if maxBufferLength > 0 { - maxBufferLength = max(maxBufferLength, minFunctionalRetryBufferLength) - } - - maxBufferBytes := conf.Producer.Retry.MaxBufferBytes - if maxBufferBytes > 0 { - maxBufferBytes = max(maxBufferBytes, minFunctionalRetryBufferBytes) - } - - return int64(maxBufferLength), maxBufferBytes -} - -func producerMessageByteSizeVersion(conf *Config) int { - if conf.Version.IsAtLeast(V0_11_0_0) { - return 2 - } - return 1 -} - -// retryBatchesAfterRefresh retries muted batches from one failed ProduceRequest -// after a single metadata refresh, preserving that failed-request boundary. -func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr error) { - if p.shuttingDown() { - p.failMutedSet(batches, ErrShuttingDown) - return - } - - var ( - maxRetryAttempt int - bufferMessages, bufferBytes int64 - ) - retryable := newProduceSet(p) - batches.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - if !pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { - failed := newProduceSet(p) - failed.addPartitionSet(topic, partition, pSet) - p.failMutedSet(failed, retryErr) - return - } - for _, msg := range pSet.msgs { - msg.retries++ - if msg.retries > maxRetryAttempt { - maxRetryAttempt = msg.retries + for _, msg := range pSet.msgs { + p.returnError(msg, retryErr) } - } - retryable.addPartitionSet(topic, partition, pSet) - bufferMessages += int64(len(pSet.msgs)) - bufferBytes += int64(pSet.bufferBytes) - }) - if retryable.empty() { - return - } - - // Reserve before waiting so pending direct retries still count against the - // producer-level retry buffer budget. - if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { - p.failMutedSet(retryable, ErrProducerRetryBufferOverflow) - return - } - - if !p.backoffOrDone(maxRetryAttempt) { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedSet(retryable, ErrShuttingDown) - return - } - topics := make([]string, 0, len(retryable.msgs)) - for topic := range retryable.msgs { - topics = append(topics, topic) - } - if err := p.client.RefreshMetadata(topics...); err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) - } - - brokerSets := make(map[*Broker]*produceSet) - retryable.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - leader, err := p.client.Leader(topic, partition) - if err != nil { - Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err) - batchMessages, batchBytes := int64(len(pSet.msgs)), int64(pSet.bufferBytes) - p.retryBufferQuota.release(batchMessages, batchBytes) - failed := newProduceSet(p) - failed.addPartitionSet(topic, partition, pSet) - p.failMutedSet(failed, retryErr) return } - set := brokerSets[leader] - if set == nil { - set = newProduceSet(p) - brokerSets[leader] = set - } - set.addPartitionSet(topic, partition, pSet) - }) - - // handoffRetryBatch also checks shutdown, but doing it here avoids creating - // or ref-counting brokerProducers just to reject the retry. - if p.shuttingDown() { - for _, set := range brokerSets { - p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) - p.failMutedSet(set, ErrShuttingDown) - } - return - } - - for leader, set := range brokerSets { - bp := p.getBrokerProducer(leader) - accepted := p.handoffRetryBatch(bp, set) - p.unrefBrokerProducer(leader, bp) - p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) - if !accepted { - p.failMutedSet(set, ErrShuttingDown) - } - } -} - -// handoffRetryBatch transfers ownership of a muted retry batch to the broker -// bridge. On false, the caller still owns the mute and must fail the batch. -func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool { - // If shutdown is already visible, do not let select randomly choose a ready - // send on bp.output. The second select still handles shutdown racing with - // the handoff. - select { - case <-bp.done: - return false - case <-p.done: - return false - default: - } - select { - case bp.output <- set: - return true - case <-bp.done: - return false - case <-p.done: - return false } + bp := p.getBrokerProducer(leader) + bp.output <- produceSet + p.unrefBrokerProducer(leader, bp) } func (bp *brokerProducer) handleError(sent *produceSet, err error) { @@ -1689,34 +1470,38 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err - - var retrySet *produceSet + var retryTopics []string + retryTopicSeen := make(map[string]struct{}) + sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + if _, ok := retryTopicSeen[topic]; ok { + return + } + retryTopicSeen[topic] = struct{}{} + retryTopics = append(retryTopics, topic) + }) + if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 { + refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) + if refreshErr != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) + } + } keepMuted := make(map[string]map[int32]struct{}) sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { - // Connection failures have no ProduceResponse to feed back through - // partitionProducer. Keep the sent batch muted and retry it asynchronously - // so later same-partition batches cannot pass it, without blocking the - // brokerProducer response loop (#1203). + // keep partition marked as in-flight during retry (connection error) if bp.currentRetries[topic] == nil { bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = err - if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) { - bp.parent.returnErrors(pSet.msgs, err) - } else { - if retrySet == nil { - retrySet = newProduceSet(bp.parent) - } + if bp.parent.conf.Producer.Idempotent { if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} - retrySet.addPartitionSet(topic, partition, pSet) + go bp.parent.retryBatch(topic, partition, pSet, err, true) + } else { + bp.parent.retryMessages(pSet.msgs, err) } }) - if retrySet != nil { - go bp.parent.retryBatchesAfterRefresh(retrySet, err) - } bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) }) @@ -1737,8 +1522,22 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - version := producerMessageByteSizeVersion(p.conf) + maxBufferLength := p.conf.Producer.Retry.MaxBufferLength + if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { + maxBufferLength = minFunctionalRetryBufferLength + } + maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes + if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { + maxBufferBytes = minFunctionalRetryBufferBytes + } + + version := 1 + if p.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + + var currentByteSize int64 var msg *ProducerMessage buf := queue.New() @@ -1750,7 +1549,7 @@ func (p *asyncProducer) retryHandler() { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): msgToRemove := buf.Remove().(*ProducerMessage) - p.retryBufferQuota.release(1, int64(msgToRemove.ByteSize(version))) + currentByteSize -= int64(msgToRemove.ByteSize(version)) continue } } @@ -1760,9 +1559,9 @@ func (p *asyncProducer) retryHandler() { } buf.Add(msg) - bufferOverflow := p.retryBufferQuota.addAndCheckOverflow(1, int64(msg.ByteSize(version))) + currentByteSize += int64(msg.ByteSize(version)) - if !bufferOverflow { + if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) { continue } @@ -1771,10 +1570,10 @@ func (p *asyncProducer) retryHandler() { select { case p.input <- msgToHandle: buf.Remove() - p.retryBufferQuota.release(1, int64(msgToHandle.ByteSize(version))) + currentByteSize -= int64(msgToHandle.ByteSize(version)) default: buf.Remove() - p.retryBufferQuota.release(1, int64(msgToHandle.ByteSize(version))) + currentByteSize -= int64(msgToHandle.ByteSize(version)) p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) } } @@ -1784,10 +1583,6 @@ func (p *asyncProducer) retryHandler() { // utility functions func (p *asyncProducer) shutdown() { - Logger.Println("Producer shutting down.") - if p.done != nil && p.closed.CompareAndSwap(false, true) { - close(p.done) - } p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} @@ -1932,71 +1727,3 @@ func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { delete(p.brokers, broker) } - -// messageQuota tracks message and byte counts against optional maximums. -type messageQuota struct { - mu sync.Mutex - messages int64 - bytes int64 - maxMessages int64 - maxBytes int64 -} - -func newMessageQuota(maxMessages, maxBytes int64) messageQuota { - return messageQuota{ - maxMessages: maxMessages, - maxBytes: maxBytes, - } -} - -func (q *messageQuota) tryReserve(messages, bytes int64) bool { - if !q.shouldTrack(messages, bytes) { - return true - } - - q.mu.Lock() - defer q.mu.Unlock() - - nextMessages := q.messages + messages - nextBytes := q.bytes + bytes - if q.overflows(nextMessages, nextBytes) { - return false - } - q.messages = nextMessages - q.bytes = nextBytes - return true -} - -func (q *messageQuota) addAndCheckOverflow(messages, bytes int64) bool { - if !q.shouldTrack(messages, bytes) { - return false - } - - q.mu.Lock() - defer q.mu.Unlock() - - q.messages += messages - q.bytes += bytes - return q.overflows(q.messages, q.bytes) -} - -func (q *messageQuota) release(messages, bytes int64) { - if !q.shouldTrack(messages, bytes) { - return - } - - q.mu.Lock() - defer q.mu.Unlock() - - q.messages -= messages - q.bytes -= bytes -} - -func (q *messageQuota) shouldTrack(messages, bytes int64) bool { - return (messages != 0 || bytes != 0) && (q.maxMessages > 0 || q.maxBytes > 0) -} - -func (q *messageQuota) overflows(messages, bytes int64) bool { - return (q.maxMessages > 0 && messages >= q.maxMessages) || - (q.maxBytes > 0 && bytes >= q.maxBytes) -} diff --git a/async_producer_test.go b/async_producer_test.go index bbc68976d..c32ac3ef5 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1562,8 +1562,6 @@ func TestBrokerProducerShutdown(t *testing.T) { bp := producer.(*asyncProducer).getBrokerProducer(broker) // Initiate the shutdown of all of them producer.(*asyncProducer).unrefBrokerProducer(broker, bp) - assertDoneWithin(t, bp.done, time.Second) - require.NotPanics(t, bp.shutdown) _ = producer.Close() mockBroker.Close() @@ -1571,7 +1569,7 @@ func TestBrokerProducerShutdown(t *testing.T) { // TestBrokerProducerWaitForSpaceEmptyBufferRollover ensures forced rollovers with an empty buffer // do not deadlock waiting for responses when no partitions are muted. -func TestBrokerProducerWaitForSpaceEmptyRollover(t *testing.T) { +func TestBrokerProducerWaitForSpaceEmptyBufferRollover(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1637,79 +1635,9 @@ func assertDoneWithin[T any](t *testing.T, ch <-chan T, timeout time.Duration) T } } -func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, *brokerProducer) { - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, errorBuffer), - done: make(chan struct{}), - txnmgr: &transactionManager{}, - retryBufferQuota: newRetryBufferQuota(config), - } - leader := &Broker{id: 1} - parent.client = &stubLeaderClient{leader: leader, cfg: config} - retryBP := &brokerProducer{ - parent: parent, - broker: leader, - output: make(chan *produceSet), - input: make(chan *ProducerMessage), - done: make(chan struct{}), - } - parent.brokers[leader] = retryBP - return parent, retryBP -} - -// holdFirstRetryAfterReserve lets the test prove a second retry sees quota held by the first. -func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, afterRefresh bool) (<-chan struct{}, func()) { - firstRetryReserved := make(chan struct{}, 1) - blockFirstRetry := make(chan struct{}, 1) - releaseFirstRetry := make(chan struct{}, 1) - blockFirstRetry <- struct{}{} - - hold := func() { - select { - case <-blockFirstRetry: - firstRetryReserved <- struct{}{} - <-releaseFirstRetry - default: - } - } - release := func() { - select { - case releaseFirstRetry <- struct{}{}: - default: - } - } - - if afterRefresh { - parent.client = &stubLeaderClient{ - leader: retryBP.broker, - cfg: parent.conf, - refreshMetadata: func(...string) error { - hold() - return nil - }, - } - } else { - parent.client = &stubLeaderClient{ - cfg: parent.conf, - leaderFunc: func(_ string, partition int32) (*Broker, error) { - if partition == 0 { - hold() - } - return retryBP.broker, nil - }, - } - } - - return firstRetryReserved, release -} - -// TestBrokerProducerWaitForSpaceExternalUnmute ensures waitForSpace does not +// TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. -func TestBrokerProducerWaitForSpaceExternalUnmute(t *testing.T) { +func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { config := NewTestConfig() txnMgr := &transactionManager{ producerID: 0, @@ -1796,56 +1724,9 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } -func TestBrokerProducerRunExternalUnmute(t *testing.T) { - config := NewTestConfig() - config.Producer.Flush.Messages = 1 - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - txnmgr: &transactionManager{}, - } - - blockedSet := newProduceSet(parent) - safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) - if !parent.muter.tryMute(blockedSet) { - t.Fatal("expected to mute partition externally") - } - - input := make(chan *ProducerMessage) - output := make(chan *produceSet, 1) - responses := make(chan *brokerProducerResponse) - bp := &brokerProducer{ - parent: parent, - broker: &Broker{id: 1}, - input: input, - output: output, - responses: responses, - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")} - safeAddMessage(t, bp.accumulatingBatch, retryMsg) - - done := make(chan struct{}) - go func() { - bp.run() - close(done) - }() - - assertNotDone(t, output, 50*time.Millisecond) - parent.muter.unmute(blockedSet) - - flushed := assertDoneWithin(t, output, 2*time.Second) - require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0]) - - close(input) - close(responses) - assertDoneWithin(t, done, 2*time.Second) -} - -// TestBrokerProducerWaitForSpaceAllMuted verifies that waitForSpace unblocks +// TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. -func TestBrokerProducerWaitForSpaceAllMuted(t *testing.T) { +func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1880,9 +1761,9 @@ func TestBrokerProducerWaitForSpaceAllMuted(t *testing.T) { } } -// TestPartitionMuterCloseWakesWait verifies that closing the muter wakes +// TestPartitionMuterCloseWakesWaitUntilMuted verifies that closing the muter wakes // goroutines blocked in waitUntilMuted. -func TestPartitionMuterCloseWakesWait(t *testing.T) { +func TestPartitionMuterCloseWakesWaitUntilMuted(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1979,7 +1860,6 @@ func TestBrokerProducerRollOverClearsTimer(t *testing.T) { func TestRetryBatchRespectsPartitionMuter(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength txnMgr := &transactionManager{ producerID: 0, producerEpoch: 0, @@ -1987,12 +1867,11 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: txnMgr, - retryBufferQuota: newRetryBufferQuota(config), + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + txnmgr: txnMgr, } leader := &Broker{} parent.client = &stubLeaderClient{leader: leader, cfg: config} @@ -2031,744 +1910,9 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } -func TestHandleErrorRetryKeepsMute(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: &transactionManager{}, - } - failedBroker := &Broker{id: 1} - retryLeader := &Broker{id: 2} - parent.client = &stubLeaderClient{leader: retryLeader, cfg: config} - - output := make(chan *produceSet) - retryBP := &brokerProducer{ - parent: parent, - broker: retryLeader, - output: output, - input: make(chan *ProducerMessage), - } - parent.brokers[retryLeader] = retryBP - - bp := &brokerProducer{ - parent: parent, - broker: failedBroker, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - retryPartitionSet := sent.msgs["topic"][0] - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - done := make(chan struct{}) - go func() { - bp.handleError(sent, ErrOutOfBrokers) - close(done) - }() - assertDoneWithin(t, done, 2*time.Second) - - retrySet := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet) - require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) - require.Equal(t, 1, retryPartitionSet.msgs[0].retries) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if parent.muter.tryMute(contender) { - parent.muter.unmute(contender) - t.Fatal("expected partition to remain muted by the retrying batch") - } -} - -func TestHandleErrorRetryFailsBatch(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Return.Errors = true - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - errors: make(chan *ProducerError, 2), - retries: make(chan *ProducerMessage, 2), - txnmgr: &transactionManager{}, - } - - bp := &brokerProducer{ - parent: parent, - broker: &Broker{id: 1}, - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - exhausted := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("exhausted"), retries: 1} - retryable := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retryable")} - sent := newProduceSet(parent) - safeAddMessage(t, sent, exhausted) - safeAddMessage(t, sent, retryable) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - parent.inFlight.Add(2) - - bp.handleError(sent, ErrOutOfBrokers) - - firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) - secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, exhausted, firstErr.Msg) - require.Equal(t, ErrOutOfBrokers, firstErr.Err) - require.Equal(t, retryable, secondErr.Msg) - require.Equal(t, ErrOutOfBrokers, secondErr.Err) - assertNotDone(t, parent.retries, 50*time.Millisecond) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after whole-batch failure") - } - parent.muter.unmute(contender) -} - -func TestHandleErrorRetryAsyncRefresh(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: &transactionManager{}, - retryBufferQuota: newRetryBufferQuota(config), - } - failedBroker := &Broker{id: 1} - retryLeader := &Broker{id: 2} - started := make(chan struct{}, 1) - release := make(chan struct{}) - client := &stubLeaderClient{ - leader: retryLeader, - cfg: config, - refreshMetadata: func(...string) error { - select { - case started <- struct{}{}: - default: - } - <-release - return nil - }, - } - parent.client = client - - output := make(chan *produceSet) - retryBP := &brokerProducer{ - parent: parent, - broker: retryLeader, - output: output, - input: make(chan *ProducerMessage), - } - parent.brokers[retryLeader] = retryBP - - bp := &brokerProducer{ - parent: parent, - broker: failedBroker, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - retryPartitionSet := sent.msgs["topic"][0] - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - done := make(chan struct{}) - go func() { - bp.handleError(sent, ErrOutOfBrokers) - close(done) - }() - assertDoneWithin(t, done, 2*time.Second) - assertDoneWithin(t, started, 2*time.Second) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if parent.muter.tryMute(contender) { - parent.muter.unmute(contender) - t.Fatal("expected partition to remain muted while async metadata refresh is pending") - } - - close(release) - retrySet := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet) - require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) -} - -func TestHandleErrorRetryRefreshOnce(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: &transactionManager{}, - } - failedBroker := &Broker{id: 1} - retryLeader := &Broker{id: 2} - calls := make(chan []string, 1) - client := &stubLeaderClient{ - leader: retryLeader, - cfg: config, - refreshMetadata: func(topics ...string) error { - calls <- append([]string(nil), topics...) - return nil - }, - } - parent.client = client - - output := make(chan *produceSet, 2) - retryBP := &brokerProducer{ - parent: parent, - broker: retryLeader, - output: output, - input: make(chan *ProducerMessage), - } - parent.brokers[retryLeader] = retryBP - parent.brokerRefs[retryBP] = 1 - - bp := &brokerProducer{ - parent: parent, - broker: failedBroker, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) - retryPartitionSet0 := sent.msgs["topic"][0] - retryPartitionSet1 := sent.msgs["topic"][1] - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - bp.handleError(sent, ErrOutOfBrokers) - - require.Equal(t, []string{"topic"}, assertDoneWithin(t, calls, 2*time.Second)) - retrySet := assertDoneWithin(t, output, 2*time.Second) - defer parent.muter.unmute(retrySet) - assertNotDone(t, calls, 50*time.Millisecond) - assertNotDone(t, output, 50*time.Millisecond) - - retriedPartitions := make(map[int32]*partitionSet) - retrySet.eachPartition(func(_ string, partition int32, pSet *partitionSet) { - retriedPartitions[partition] = pSet - }) - require.Same(t, retryPartitionSet0, retriedPartitions[int32(0)]) - require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) -} - -func TestHandleErrorRetryGroupsByLeader(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: &transactionManager{}, - } - failedBroker := &Broker{id: 1} - leaderA := &Broker{id: 2} - leaderB := &Broker{id: 3} - calls := make(chan []string, 1) - client := &stubLeaderClient{ - cfg: config, - leaders: map[string]map[int32]*Broker{ - "topic-a": { - 0: leaderA, - 1: leaderA, - }, - "topic-b": { - 0: leaderB, - 1: leaderB, - }, - }, - refreshMetadata: func(topics ...string) error { - calls <- append([]string(nil), topics...) - return nil - }, - } - parent.client = client - - outputA := make(chan *produceSet, 1) - outputB := make(chan *produceSet, 1) - retryBPA := &brokerProducer{ - parent: parent, - broker: leaderA, - output: outputA, - input: make(chan *ProducerMessage), - } - retryBPB := &brokerProducer{ - parent: parent, - broker: leaderB, - output: outputB, - input: make(chan *ProducerMessage), - } - parent.brokers[leaderA] = retryBPA - parent.brokers[leaderB] = retryBPB - - bp := &brokerProducer{ - parent: parent, - broker: failedBroker, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 0, Value: StringEncoder("a-0")}) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-a", Partition: 1, Value: StringEncoder("a-1")}) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 0, Value: StringEncoder("b-0")}) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic-b", Partition: 1, Value: StringEncoder("b-1")}) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - bp.handleError(sent, ErrOutOfBrokers) - - require.ElementsMatch(t, []string{"topic-a", "topic-b"}, assertDoneWithin(t, calls, 2*time.Second)) - retrySetA := assertDoneWithin(t, outputA, 2*time.Second) - defer parent.muter.unmute(retrySetA) - retrySetB := assertDoneWithin(t, outputB, 2*time.Second) - defer parent.muter.unmute(retrySetB) - - require.Len(t, retrySetA.msgs, 1) - require.Len(t, retrySetA.msgs["topic-a"], 2) - require.Len(t, retrySetB.msgs, 1) - require.Len(t, retrySetB.msgs["topic-b"], 2) - for _, partitions := range retrySetA.msgs { - for _, pSet := range partitions { - require.Equal(t, 1, pSet.msgs[0].retries) - } - } - for _, partitions := range retrySetB.msgs { - for _, pSet := range partitions { - require.Equal(t, 1, pSet.msgs[0].retries) - } - } - - assertNotDone(t, outputA, 50*time.Millisecond) - assertNotDone(t, outputB, 50*time.Millisecond) -} - -func TestHandleSuccessRetryUsesPartitionProducer(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - retries: make(chan *ProducerMessage, 1), - txnmgr: &transactionManager{}, - } - - bp := &brokerProducer{ - parent: parent, - broker: &Broker{id: 1}, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} - safeAddMessage(t, sent, msg) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - response := new(ProduceResponse) - response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) - bp.handleSuccess(sent, response) - - retried := assertDoneWithin(t, parent.retries, 2*time.Second) - require.Equal(t, msg, retried) - require.Equal(t, 1, retried.retries) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected response retry to release sent mute before partitionProducer retry") - } - parent.muter.unmute(contender) -} - -func TestRetryBatchShutdownReleasesMute(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 1), - done: make(chan struct{}), - txnmgr: &transactionManager{}, - retryBufferQuota: newRetryBufferQuota(config), - } - leader := &Broker{id: 1} - leaderLookup := make(chan struct{}, 1) - parent.client = &stubLeaderClient{ - cfg: config, - leaderFunc: func(string, int32) (*Broker, error) { - select { - case leaderLookup <- struct{}{}: - default: - } - return leader, nil - }, - } - retryBP := &brokerProducer{ - parent: parent, - broker: leader, - output: make(chan *produceSet), - input: make(chan *ProducerMessage), - done: make(chan struct{}), - } - parent.brokers[leader] = retryBP - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - retryPartitionSet := sent.msgs["topic"][0] - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - parent.inFlight.Add(1) - - done := make(chan struct{}) - go func() { - parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) - close(done) - }() - assertDoneWithin(t, leaderLookup, 2*time.Second) - - if parent.closed.CompareAndSwap(false, true) { - close(parent.done) - } - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrShuttingDown, producerErr.Err) - assertDoneWithin(t, done, 2*time.Second) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after aborted retry handoff") - } - parent.muter.unmute(contender) -} - -func TestRetryUsesSharedBufferBudget(t *testing.T) { - tests := []struct { - name string - afterRefresh bool - bytesLimit bool - }{ - {name: "retryBatch length"}, - {name: "retryBatch bytes", bytesLimit: true}, - {name: "retryBatchesAfterRefresh length", afterRefresh: true}, - {name: "retryBatchesAfterRefresh bytes", afterRefresh: true, bytesLimit: true}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = !tt.afterRefresh - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - - firstCount := 1 - errorBuffer := 2 - var firstMsg *ProducerMessage - secondMsg := &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("overflow")} - - if tt.bytesLimit { - firstMsg = &ProducerMessage{Topic: "topic", Partition: 0, Value: ByteEncoder(make([]byte, minFunctionalRetryBufferBytes))} - version := producerMessageByteSizeVersion(config) - firstBytes := int64(firstMsg.ByteSize(version)) - secondBytes := int64(secondMsg.ByteSize(version)) - config.Producer.Retry.MaxBufferBytes = firstBytes + secondBytes - } else { - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + 1 - firstCount = minFunctionalRetryBufferLength - errorBuffer = minFunctionalRetryBufferLength + 1 - } - - parent, retryBP := newBlockingRetryProducer(config, errorBuffer) - firstRetryReserved, releaseFirstRetry := holdFirstRetryAfterReserve(parent, retryBP, tt.afterRefresh) - defer releaseFirstRetry() - - retry := func(partition int32, pSet *partitionSet) { - if tt.afterRefresh { - retrySet := newProduceSet(parent) - retrySet.addPartitionSet("topic", partition, pSet) - parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) - return - } - parent.retryBatch("topic", partition, pSet, ErrNotEnoughReplicas, true) - } - - first := newProduceSet(parent) - if tt.bytesLimit { - safeAddMessage(t, first, firstMsg) - } else { - for i := 0; i < minFunctionalRetryBufferLength; i++ { - safeAddMessage(t, first, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - } - } - firstPartitionSet := first.msgs["topic"][0] - if !parent.muter.tryMute(first) { - t.Fatal("expected first retry batch to mute partitions") - } - parent.inFlight.Add(firstCount) - - firstDone := make(chan struct{}) - go func() { - retry(0, firstPartitionSet) - close(firstDone) - }() - assertDoneWithin(t, firstRetryReserved, 2*time.Second) - - second := newProduceSet(parent) - safeAddMessage(t, second, secondMsg) - secondPartitionSet := second.msgs["topic"][1] - if !parent.muter.tryMute(second) { - t.Fatal("expected second retry batch to mute partitions") - } - parent.inFlight.Add(1) - retry(1, secondPartitionSet) - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrProducerRetryBufferOverflow, producerErr.Err) - require.Equal(t, int32(1), producerErr.Msg.Partition) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected overflowed retry batch to release partition mute") - } - parent.muter.unmute(contender) - - close(retryBP.done) - releaseFirstRetry() - assertDoneWithin(t, firstDone, 2*time.Second) - }) - } -} - -func TestRetryBatchesAfterRefreshBrokerDone(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 1), - done: make(chan struct{}), - txnmgr: &transactionManager{}, - } - leader := &Broker{id: 1} - parent.client = &stubLeaderClient{leader: leader, cfg: config} - retryBP := &brokerProducer{ - parent: parent, - broker: leader, - output: make(chan *produceSet), - input: make(chan *ProducerMessage), - done: make(chan struct{}), - } - parent.brokers[leader] = retryBP - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - retryPartitionSet := sent.msgs["topic"][0] - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - parent.inFlight.Add(1) - - done := make(chan struct{}) - go func() { - retrySet := newProduceSet(parent) - retrySet.addPartitionSet("topic", 0, retryPartitionSet) - parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) - close(done) - }() - - close(retryBP.done) - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrShuttingDown, producerErr.Err) - assertDoneWithin(t, done, 2*time.Second) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after brokerProducer done") - } - parent.muter.unmute(contender) -} - -func TestRetryBatchesAfterRefreshShutdown(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 2), - done: make(chan struct{}), - txnmgr: &transactionManager{}, - } - leader := &Broker{id: 1} - output := make(chan *produceSet, 1) - parent.brokers[leader] = &brokerProducer{ - parent: parent, - broker: leader, - output: output, - input: make(chan *ProducerMessage), - done: make(chan struct{}), - } - shutdownStarted := false - parent.client = &stubLeaderClient{ - cfg: config, - leaderFunc: func(string, int32) (*Broker, error) { - if !shutdownStarted { - close(parent.done) - shutdownStarted = true - } - return leader, nil - }, - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - parent.inFlight.Add(2) - - parent.retryBatchesAfterRefresh(sent, ErrOutOfBrokers) - - firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) - secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrShuttingDown, firstErr.Err) - require.Equal(t, ErrShuttingDown, secondErr.Err) - require.ElementsMatch(t, []int32{0, 1}, []int32{firstErr.Msg.Partition, secondErr.Msg.Partition}) - assertNotDone(t, output, 50*time.Millisecond) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next-0")}) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next-1")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mutes to be released after shutdown") - } - parent.muter.unmute(contender) -} - -func TestHandleErrorRetryLeaderError(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 1), - txnmgr: &transactionManager{}, - } - parent.client = &stubLeaderClient{cfg: config, leaderErr: ErrOutOfBrokers} - - bp := &brokerProducer{ - parent: parent, - broker: &Broker{id: 1}, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - parent.inFlight.Add(1) - - bp.handleError(sent, ErrOutOfBrokers) - - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrOutOfBrokers, producerErr.Err) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after retry leader lookup failure") - } - parent.muter.unmute(contender) -} - type stubLeaderClient struct { - cfg *Config - leader *Broker - leaderFunc func(string, int32) (*Broker, error) - leaderErr error - leaders map[string]map[int32]*Broker - refreshMetadata func(...string) error + cfg *Config + leader *Broker } func (c *stubLeaderClient) Config() *Config { return c.cfg } @@ -2780,36 +1924,16 @@ func (c *stubLeaderClient) Topics() ([]string, error) { return func (c *stubLeaderClient) Partitions(string) ([]int32, error) { return nil, nil } func (c *stubLeaderClient) WritablePartitions(string) ([]int32, error) { return nil, nil } func (c *stubLeaderClient) Leader(topic string, partitionID int32) (*Broker, error) { - if c.leaderFunc != nil { - return c.leaderFunc(topic, partitionID) - } - if c.leaderErr != nil { - return nil, c.leaderErr - } - if partitions := c.leaders[topic]; partitions != nil { - if leader := partitions[partitionID]; leader != nil { - return leader, nil - } - } - if c.leaders != nil { - return nil, ErrLeaderNotAvailable - } return c.leader, nil } -func (c *stubLeaderClient) LeaderAndEpoch(topic string, partition int32) (*Broker, int32, error) { - leader, err := c.Leader(topic, partition) - return leader, 0, err -} -func (c *stubLeaderClient) Replicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) InSyncReplicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) OfflineReplicas(string, int32) ([]int32, error) { return nil, nil } -func (c *stubLeaderClient) RefreshBrokers([]string) error { return nil } -func (c *stubLeaderClient) RefreshMetadata(topics ...string) error { - if c.refreshMetadata != nil { - return c.refreshMetadata(topics...) - } - return nil +func (c *stubLeaderClient) LeaderAndEpoch(string, int32) (*Broker, int32, error) { + return c.leader, 0, nil } +func (c *stubLeaderClient) Replicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) InSyncReplicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) OfflineReplicas(string, int32) ([]int32, error) { return nil, nil } +func (c *stubLeaderClient) RefreshBrokers([]string) error { return nil } +func (c *stubLeaderClient) RefreshMetadata(...string) error { return nil } func (c *stubLeaderClient) GetOffset(string, int32, int64) (int64, error) { return 0, nil } func (c *stubLeaderClient) Coordinator(string) (*Broker, error) { return nil, nil } func (c *stubLeaderClient) RefreshCoordinator(string) error { return nil } diff --git a/client.go b/client.go index 6dbf0d009..6641743ac 100644 --- a/client.go +++ b/client.go @@ -732,7 +732,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { for _, broker := range brokers { if err := broker.getSockError(); err != nil { - DebugLogger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) } } @@ -741,7 +741,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { func (client *client) checkBrokersHealth() { for id, broker := range client.brokers { if err := broker.getSockError(); err != nil { - DebugLogger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) delete(client.brokers, id) } @@ -1103,10 +1103,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, return err } // else remove that broker and try again + Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) _ = broker.Close() client.deregisterBroker(broker) } else { // some other error, remove that broker and try again + Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) brokerErrors = append(brokerErrors, err) _ = broker.Close() client.deregisterBroker(broker) diff --git a/config.go b/config.go index 9e837bbc8..8b87a6cd2 100644 --- a/config.go +++ b/config.go @@ -276,20 +276,16 @@ type Config struct { // more sophisticated backoff strategies. This takes precedence over // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration - // The maximum number of messages in producer retry buffering, including - // the bridging buffer between `input` and `retries` channels in - // AsyncProducer#retryHandler and direct retries waiting for metadata - // refresh, retry backoff, leader lookup, partition mute, or broker handoff. - // The limit is to prevent retry buffering from overflowing or causing OOM. + // The maximum length of the bridging buffer between `input` and `retries` channels + // in AsyncProducer#retryHandler. + // The limit is to prevent this buffer from overflowing or causing OOM. // Defaults to 0 for unlimited. // Any value between 0 and 4096 is pushed to 4096. // A zero or negative value indicates unlimited. MaxBufferLength int - // The maximum total byte size of messages in producer retry buffering, - // including the bridging buffer between `input` and `retries` channels - // in AsyncProducer#retryHandler and direct retries waiting for metadata - // refresh, retry backoff, leader lookup, partition mute, or broker handoff. - // This limit prevents retry buffering from consuming excessive memory. + // The maximum total byte size of messages in the bridging buffer between `input` + // and `retries` channels in AsyncProducer#retryHandler. + // This limit prevents the buffer from consuming excessive memory. // Defaults to 0 for unlimited. // Any value between 0 and 32 MB is pushed to 32 MB. // A zero or negative value indicates unlimited. diff --git a/functional_producer_test.go b/functional_producer_test.go index bbb0e27dc..df8a5d0ca 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -430,8 +430,8 @@ func TestFuncTxnProduceAndCommitOffset(t *testing.T) { handler.started.Add(4) go func() { - consumeErr := cg.Consume(ctx, []string{"test.4"}, handler) - require.NoError(t, consumeErr) + err = cg.Consume(ctx, []string{"test.4"}, handler) + require.NoError(t, err) }() handler.started.Wait() diff --git a/produce_set.go b/produce_set.go index 0fbb7097a..380f7f5ab 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,20 +12,6 @@ type partitionSet struct { bufferBytes int } -// shouldKeepMuted matches retryBatch's whole-batch retry rule: if any message has -// exhausted retries, the batch will be failed instead of retried. -func (ps *partitionSet) shouldKeepMuted(maxRetries int) bool { - if len(ps.msgs) == 0 { - return false - } - for _, msg := range ps.msgs { - if msg.retries >= maxRetries { - return false - } - } - return true -} - type produceSet struct { parent *asyncProducer msgs map[string]map[int32]*partitionSet @@ -50,15 +36,6 @@ func newProduceSetWithMeta(parent *asyncProducer, producerID int64, producerEpoc } } -func (ps *produceSet) addPartitionSet(topic string, partition int32, set *partitionSet) { - if ps.msgs[topic] == nil { - ps.msgs[topic] = make(map[int32]*partitionSet) - } - ps.msgs[topic][partition] = set - ps.bufferBytes += set.bufferBytes - ps.bufferCount += len(set.msgs) -} - func (ps *produceSet) add(msg *ProducerMessage) error { var err error var key, val []byte diff --git a/produce_set_test.go b/produce_set_test.go index ea8086750..9ab964949 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -25,33 +25,6 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } } -func TestShouldKeepMuted(t *testing.T) { - pSet := &partitionSet{} - if pSet.shouldKeepMuted(1) { - t.Error("empty partition set should not be retryable") - } - - pSet = &partitionSet{ - msgs: []*ProducerMessage{ - {retries: 0}, - {retries: 0}, - }, - } - if !pSet.shouldKeepMuted(1) { - t.Error("expected batch to be retryable when every message is below retry max") - } - - pSet = &partitionSet{ - msgs: []*ProducerMessage{ - {retries: 0}, - {retries: 1}, - }, - } - if pSet.shouldKeepMuted(1) { - t.Error("expected batch not to be retryable when any message has exhausted retries") - } -} - func TestProduceSetInitial(t *testing.T) { _, ps := makeProduceSet()