diff --git a/async_producer.go b/async_producer.go index bcd7382e7..a4b8bac29 100644 --- a/async_producer.go +++ b/async_producer.go @@ -94,8 +94,6 @@ type asyncProducer struct { input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup - // done is closed before waiting on in-flight messages so retryBatch - // goroutines can stop waiting on broker handoff and release partition mutes. done chan struct{} closed atomic.Bool @@ -119,19 +117,16 @@ type asyncProducer struct { type partitionMuter struct { mu sync.Mutex - cond *sync.Cond closed bool inFlightCounts map[string]map[int32]int // topic -> partition -> in-flight count unmuteSignal chan struct{} } func newPartitionMuter() *partitionMuter { - m := &partitionMuter{ + return &partitionMuter{ inFlightCounts: make(map[string]map[int32]int), unmuteSignal: make(chan struct{}), } - m.cond = sync.NewCond(&m.mu) - return m } // isMuted reports whether the partition has an in-flight batch. @@ -194,30 +189,6 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool { return true } -// waitUntilMuted blocks until all partitions in the set can be muted, then mutes them. -// Returns false if the muter was closed before all partitions could be muted. -func (m *partitionMuter) waitUntilMuted(set *produceSet) bool { - if set == nil || set.empty() { - return false - } - - m.mu.Lock() - defer m.mu.Unlock() - - for { - if m.closed { - return false - } - if !m.isAnyMuted(set) { - break - } - m.cond.Wait() - } - - m.muteSet(set) - return true -} - func (m *partitionMuter) awaitUnmuteChan(set *produceSet) (<-chan struct{}, bool) { if set == nil || set.empty() { return nil, false @@ -261,10 +232,9 @@ func (m *partitionMuter) unmute(set *produceSet) { }) close(m.unmuteSignal) m.unmuteSignal = make(chan struct{}) - m.cond.Broadcast() } -// close shuts down the muter, waking any goroutines blocked in waitUntilMuted. +// close shuts down the muter, waking goroutines waiting for an unmute signal. func (m *partitionMuter) close() { m.mu.Lock() defer m.mu.Unlock() @@ -274,7 +244,6 @@ func (m *partitionMuter) close() { } m.closed = true close(m.unmuteSignal) - m.cond.Broadcast() } // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration. @@ -803,28 +772,6 @@ func (p *asyncProducer) calcBackoff(retries int) time.Duration { return p.conf.Producer.Retry.Backoff } -func (p *asyncProducer) shuttingDown() bool { - select { - case <-p.done: - return true - default: - return false - } -} - -func (p *asyncProducer) backoffOrDone(retries int) bool { - backoff := p.calcBackoff(retries) - if backoff <= 0 { - return !p.shuttingDown() - } - select { - case <-time.After(backoff): - return true - case <-p.done: - return false - } -} - func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMessage) error { if pp.brokerProducer == nil { if err := pp.updateLeader(); err != nil { @@ -857,6 +804,7 @@ func (pp *partitionProducer) dispatch() { select { case <-pp.brokerProducer.abandoned: // a message on the abandoned channel means that our current broker selection is out of date + Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) pp.brokerProducer = nil time.Sleep(pp.parent.conf.Producer.Retry.Backoff) @@ -988,6 +936,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + abandoned: make(chan struct{}), done: make(chan struct{}), accumulatingBatch: newProduceSet(p), currentRetries: make(map[string]map[int32]error), @@ -1079,10 +1028,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { } }) - if p.conf.Producer.Retry.Max <= 0 { - bp.abandoned = make(chan struct{}) - } - return bp } @@ -1164,9 +1109,10 @@ func (bp *brokerProducer) run() { } if reason := bp.needsRetry(msg); reason != nil { + flags := msg.flags bp.parent.retryMessage(msg, reason) - if bp.closing == nil && msg.flags&fin == fin { + if bp.closing == nil && flags&fin == fin { // we were retrying this partition but we can start processing again delete(bp.currentRetries[msg.Topic], msg.Partition) } @@ -1262,12 +1208,15 @@ func (bp *brokerProducer) shutdown() { if bp.flushingBatch == nil { bp.tryBuildFlushingBatch() } - var unmuteCh <-chan struct{} var outputCh chan<- *produceSet if bp.flushingBatch != nil { outputCh = bp.output - } else if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { - unmuteCh = ch + } else if _, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked { + bp.accumulatingBatch.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + bp.parent.returnErrors(pSet.msgs, ErrShuttingDown) + }) + bp.rollOver() + continue } select { case response, ok := <-bp.responses: @@ -1276,7 +1225,6 @@ func (bp *brokerProducer) shutdown() { } case outputCh <- bp.flushingBatch: bp.flushingBatch = nil - case <-unmuteCh: } } close(bp.output) @@ -1367,8 +1315,33 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - var retryTopics []string - keepMuted := make(map[string]map[int32]struct{}) + var ( + shouldAbandonBroker bool + retryTopics []string + keepMuted map[string]map[int32]struct{} + // Group by broker error so failed retries report the original partition error. + responseRetries map[KError]*produceSet + ) + markKeepMuted := func(topic string, partition int32) { + if keepMuted == nil { + keepMuted = make(map[string]map[int32]struct{}) + } + if keepMuted[topic] == nil { + keepMuted[topic] = make(map[int32]struct{}) + } + keepMuted[topic][partition] = struct{}{} + } + addResponseRetry := func(retryErr KError, topic string, partition int32, pSet *partitionSet) { + if responseRetries == nil { + responseRetries = make(map[KError]*produceSet) + } + retrySet := responseRetries[retryErr] + if retrySet == nil { + retrySet = newProduceSet(bp.parent) + responseRetries[retryErr] = retrySet + } + retrySet.addPartitionSet(topic, partition, pSet) + } sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success @@ -1397,20 +1370,16 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo // Duplicate case ErrDuplicateSequenceNumber: bp.parent.returnSuccesses(pSet.msgs) - // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: + // Retriable errors + shouldAbandonBroker = true if bp.parent.conf.Producer.Retry.Max <= 0 { - bp.parent.abandonBrokerConnection(bp.broker) bp.parent.returnErrors(pSet.msgs, block.Err) } else { retryTopics = append(retryTopics, topic) - if bp.parent.conf.Producer.Idempotent { - if keepMuted[topic] == nil { - keepMuted[topic] = make(map[int32]struct{}) - } - keepMuted[topic][partition] = struct{}{} - } + markKeepMuted(topic, partition) + addResponseRetry(block.Err, topic, partition, pSet) } // Other non-retriable errors default: @@ -1421,14 +1390,11 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo } }) - if len(retryTopics) > 0 { - if bp.parent.conf.Producer.Idempotent { - err := bp.parent.client.RefreshMetadata(retryTopics...) - if err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) - } - } + if shouldAbandonBroker { + bp.parent.abandonBrokerConnection(bp.broker) + } + if len(retryTopics) > 0 { sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { block := response.GetBlock(topic, partition) if block == nil { @@ -1445,20 +1411,16 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = block.Err - if bp.parent.conf.Producer.Idempotent { - go bp.parent.retryBatch(topic, partition, pSet, block.Err, true) - } else { - // Non-idempotent response retries must go back through partitionProducer. - // That path advances the retry high watermark and refreshes the partition - // leader before sending the retry; retryBatch would bypass that state. - bp.parent.retryMessages(pSet.msgs, block.Err) - } // dropping the following messages has the side effect of incrementing their retry count bp.parent.retryMessages(bp.accumulatingBatch.dropPartition(topic, partition), block.Err) } }) } + for retryErr, retrySet := range responseRetries { + go bp.parent.retryBatchesAfterRefresh(retrySet, retryErr) + } + unmuteSet := sent.copyFunc(func(topic string, partition int32) bool { if partitions := keepMuted[topic]; partitions != nil { _, kept := partitions[partition] @@ -1469,63 +1431,6 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.muter.unmute(unmuteSet) } -func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, retryErr error, alreadyMuted bool) { - Logger.Printf("Retrying batch for %v-%d because of %v\n", topic, partition, retryErr) - produceSet := newProduceSet(p) - produceSet.addPartitionSet(topic, partition, pSet) - bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) - - muted := alreadyMuted - failBatch := func(err error) { - // Release the partition before reporting errors so a blocked Errors - // consumer cannot keep later same-partition messages muted. - if muted { - p.muter.unmute(produceSet) - muted = false - } - p.returnErrors(pSet.msgs, err) - } - for _, msg := range pSet.msgs { - if msg.retries >= p.conf.Producer.Retry.Max { - failBatch(retryErr) - return - } - msg.retries++ - } - if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { - failBatch(ErrProducerRetryBufferOverflow) - return - } - defer p.retryBufferQuota.release(bufferMessages, bufferBytes) - - // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch - // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. - if len(pSet.msgs) > 0 && !p.backoffOrDone(pSet.msgs[0].retries) { - failBatch(ErrShuttingDown) - return - } - - // it's expected that a metadata refresh has been requested prior to calling retryBatch - leader, leaderErr := p.client.Leader(topic, partition) - if leaderErr != nil { - Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, leaderErr) - failBatch(retryErr) - return - } - if !alreadyMuted { - if !p.muter.waitUntilMuted(produceSet) { - failBatch(retryErr) - return - } - muted = true - } - bp := p.getBrokerProducer(leader) - defer p.unrefBrokerProducer(leader, bp) - if !p.handoffRetryBatch(bp, produceSet) { - failBatch(ErrShuttingDown) - } -} - func (p *asyncProducer) failMutedSet(set *produceSet, err error) { p.muter.unmute(set) set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { @@ -1562,11 +1467,6 @@ func producerMessageByteSizeVersion(conf *Config) int { // retryBatchesAfterRefresh retries muted batches from one failed ProduceRequest // after a single metadata refresh, preserving that failed-request boundary. func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr error) { - if p.shuttingDown() { - p.failMutedSet(batches, ErrShuttingDown) - return - } - var ( maxRetryAttempt int bufferMessages, bufferBytes int64 @@ -1600,10 +1500,8 @@ func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr e return } - if !p.backoffOrDone(maxRetryAttempt) { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedSet(retryable, ErrShuttingDown) - return + if backoff := p.calcBackoff(maxRetryAttempt); backoff > 0 { + time.Sleep(backoff) } topics := make([]string, 0, len(retryable.msgs)) for topic := range retryable.msgs { @@ -1633,16 +1531,6 @@ func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr e set.addPartitionSet(topic, partition, pSet) }) - // handoffRetryBatch also checks shutdown, but doing it here avoids creating - // or ref-counting brokerProducers just to reject the retry. - if p.shuttingDown() { - for _, set := range brokerSets { - p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) - p.failMutedSet(set, ErrShuttingDown) - } - return - } - for leader, set := range brokerSets { bp := p.getBrokerProducer(leader) accepted := p.handoffRetryBatch(bp, set) @@ -1657,14 +1545,9 @@ func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr e // handoffRetryBatch transfers ownership of a muted retry batch to the broker // bridge. On false, the caller still owns the mute and must fail the batch. func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool { - // If shutdown is already visible, do not let select randomly choose a ready - // send on bp.output. The second select still handles shutdown racing with - // the handoff. select { case <-bp.done: return false - case <-p.done: - return false default: } select { @@ -1672,8 +1555,6 @@ func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) b return true case <-bp.done: return false - case <-p.done: - return false } } diff --git a/async_producer_test.go b/async_producer_test.go index bbc68976d..f1c07d75c 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -300,6 +300,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } + leader2.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 10, 0) @@ -562,6 +563,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } + leader2.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 10, 0) @@ -620,6 +622,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { expectResults(t, producer, 1, 0) producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + leader2.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 1, 0) @@ -973,6 +976,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } + leader.Returns(metadataResponse) prodSuccess = new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) leader.Returns(prodSuccess) @@ -1297,7 +1301,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) { return metadataResponse case 22: return initProducerIDResponse - case 0: // for msg, always return error to trigger retryBatch + case 0: // for msg, always return error to trigger response retry return prodNotLeaderResponse } return nil @@ -1337,6 +1341,73 @@ func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) { closeProducer(t, producer) } +// test case for https://github.com/IBM/sarama/issues/2469: idempotent producer +// retries must honor Retry.Backoff / Retry.BackoffFunc when the broker repeatedly +// returns a retriable produce response. +func TestAsyncProducerIdempotentResponseRetryBackoff(t *testing.T) { + broker := NewMockBroker(t, 1) + + metadataResponse := &MetadataResponse{ + Version: 4, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) + + initProducerIDResponse := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + + prodNotLeaderResponse := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) + + handler := func(req *request) (res encoderWithHeader) { + switch req.body.key() { + case 3: + return metadataResponse + case 22: + return initProducerIDResponse + case 0: + return prodNotLeaderResponse + } + return nil + } + + config := NewTestConfig() + config.Version = V0_11_0_0 + config.Producer.Idempotent = true + config.Net.MaxOpenRequests = 1 + config.Producer.Retry.Max = 3 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Return.Successes = true + config.Producer.Flush.Frequency = 50 * time.Millisecond + + backoffCalls := new(atomic.Int32) + config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { + backoffCalls.Add(1) + return 0 + } + + broker.setHandler(handler) + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + require.NoError(t, err) + + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + + expectResults(t, producer, 0, 1) + + broker.Close() + closeProducer(t, producer) + + assert.GreaterOrEqual(t, int(backoffCalls.Load()), config.Producer.Retry.Max, + "BackoffFunc should be called at least Retry.Max times during idempotent response retry") +} + func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { broker := NewMockBroker(t, 1) @@ -1662,7 +1733,7 @@ func newBlockingRetryProducer(config *Config, errorBuffer int) (*asyncProducer, } // holdFirstRetryAfterReserve lets the test prove a second retry sees quota held by the first. -func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, afterRefresh bool) (<-chan struct{}, func()) { +func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer) (<-chan struct{}, func()) { firstRetryReserved := make(chan struct{}, 1) blockFirstRetry := make(chan struct{}, 1) releaseFirstRetry := make(chan struct{}, 1) @@ -1683,25 +1754,13 @@ func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, } } - if afterRefresh { - parent.client = &stubLeaderClient{ - leader: retryBP.broker, - cfg: parent.conf, - refreshMetadata: func(...string) error { - hold() - return nil - }, - } - } else { - parent.client = &stubLeaderClient{ - cfg: parent.conf, - leaderFunc: func(_ string, partition int32) (*Broker, error) { - if partition == 0 { - hold() - } - return retryBP.broker, nil - }, - } + parent.client = &stubLeaderClient{ + leader: retryBP.broker, + cfg: parent.conf, + refreshMetadata: func(...string) error { + hold() + return nil + }, } return firstRetryReserved, release @@ -1880,43 +1939,6 @@ func TestBrokerProducerWaitForSpaceAllMuted(t *testing.T) { } } -// TestPartitionMuterCloseWakesWait verifies that closing the muter wakes -// goroutines blocked in waitUntilMuted. -func TestPartitionMuterCloseWakesWait(t *testing.T) { - config := NewTestConfig() - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - txnmgr: &transactionManager{}, - } - - blockedSet := newProduceSet(parent) - safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")}) - if !parent.muter.tryMute(blockedSet) { - t.Fatal("expected to mute partition") - } - - waitSet := newProduceSet(parent) - safeAddMessage(t, waitSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")}) - - done := make(chan bool, 1) - go func() { - done <- parent.muter.waitUntilMuted(waitSet) - }() - - assertNotDone(t, done, 50*time.Millisecond) - parent.muter.close() - - select { - case result := <-done: - if result { - t.Fatal("expected waitUntilMuted to return false after close") - } - case <-time.After(2 * time.Second): - t.Fatal("timed out") - } -} - // TestBrokerProducerRollOverClearsTimer ensures timer events from a previous batch // do not cause a flush of a fresh empty batch after rollOver. func TestBrokerProducerRollOverClearsTimer(t *testing.T) { @@ -1976,61 +1998,6 @@ func TestBrokerProducerRollOverClearsTimer(t *testing.T) { } } -func TestRetryBatchRespectsPartitionMuter(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength - txnMgr := &transactionManager{ - producerID: 0, - producerEpoch: 0, - sequenceNumbers: make(map[string]int32), - } - - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - brokers: make(map[*Broker]*brokerProducer), - brokerRefs: make(map[*brokerProducer]int), - txnmgr: txnMgr, - retryBufferQuota: newRetryBufferQuota(config), - } - leader := &Broker{} - parent.client = &stubLeaderClient{leader: leader, cfg: config} - - output := make(chan *produceSet, 1) - bp := &brokerProducer{ - parent: parent, - broker: leader, - output: output, - input: make(chan *ProducerMessage), - } - parent.brokers[leader] = bp - - retrySet := newProduceSet(parent) - safeAddMessage(t, retrySet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) - retryPartitionSet := retrySet.msgs["topic"][0] - if !parent.muter.tryMute(retrySet) { - t.Fatal("expected retry set to mute partitions") - } - parent.muter.unmute(retrySet) - - parent.retryBatch("topic", 0, retryPartitionSet, ErrNotEnoughReplicas, false) - - select { - case sent := <-output: - set := sent.msgs["topic"][0] - require.Equal(t, retryPartitionSet, set) - default: - t.Fatal("expected retry batch to be dispatched") - } - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if parent.muter.tryMute(contender) { - t.Fatal("expected partition to remain muted by retry batch") - } -} - func TestHandleErrorRetryKeepsMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false @@ -2385,136 +2352,106 @@ func TestHandleErrorRetryGroupsByLeader(t *testing.T) { assertNotDone(t, outputB, 50*time.Millisecond) } -func TestHandleSuccessRetryUsesPartitionProducer(t *testing.T) { +func TestHandleSuccessRetryKeepsMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 - parent := &asyncProducer{ - conf: config, - muter: newPartitionMuter(), - retries: make(chan *ProducerMessage, 1), - txnmgr: &transactionManager{}, - } - - bp := &brokerProducer{ - parent: parent, - broker: &Broker{id: 1}, - input: make(chan *ProducerMessage), - accumulatingBatch: newProduceSet(parent), - currentRetries: make(map[string]map[int32]error), - } - - sent := newProduceSet(parent) - msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} - safeAddMessage(t, sent, msg) - if !parent.muter.tryMute(sent) { - t.Fatal("expected sent batch to mute partitions") - } - - response := new(ProduceResponse) - response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) - bp.handleSuccess(sent, response) - - retried := assertDoneWithin(t, parent.retries, 2*time.Second) - require.Equal(t, msg, retried) - require.Equal(t, 1, retried.retries) - - contender := newProduceSet(parent) - safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected response retry to release sent mute before partitionProducer retry") + retryLeader := &Broker{id: 2} + refreshStarted := make(chan struct{}, 1) + releaseRefresh := make(chan struct{}) + var releaseOnce sync.Once + releaseRetry := func() { + releaseOnce.Do(func() { + close(releaseRefresh) + }) } - parent.muter.unmute(contender) -} - -func TestRetryBatchShutdownReleasesMute(t *testing.T) { - config := NewTestConfig() - config.Producer.Idempotent = false - config.Producer.Retry.Max = 1 - config.Producer.Retry.Backoff = 0 - config.Producer.Return.Errors = true - config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength + defer releaseRetry() parent := &asyncProducer{ conf: config, muter: newPartitionMuter(), brokers: make(map[*Broker]*brokerProducer), brokerRefs: make(map[*brokerProducer]int), - errors: make(chan *ProducerError, 1), + retries: make(chan *ProducerMessage, 1), done: make(chan struct{}), txnmgr: &transactionManager{}, retryBufferQuota: newRetryBufferQuota(config), } - leader := &Broker{id: 1} - leaderLookup := make(chan struct{}, 1) parent.client = &stubLeaderClient{ - cfg: config, - leaderFunc: func(string, int32) (*Broker, error) { + leader: retryLeader, + cfg: config, + refreshMetadata: func(...string) error { select { - case leaderLookup <- struct{}{}: + case refreshStarted <- struct{}{}: default: } - return leader, nil + <-releaseRefresh + return nil }, } + + output := make(chan *produceSet) retryBP := &brokerProducer{ parent: parent, - broker: leader, - output: make(chan *produceSet), + broker: retryLeader, + output: output, input: make(chan *ProducerMessage), done: make(chan struct{}), } - parent.brokers[leader] = retryBP + parent.brokers[retryLeader] = retryBP + + bp := &brokerProducer{ + parent: parent, + broker: &Broker{id: 1}, + input: make(chan *ProducerMessage), + accumulatingBatch: newProduceSet(parent), + currentRetries: make(map[string]map[int32]error), + } sent := newProduceSet(parent) - safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")}) + msg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")} + safeAddMessage(t, sent, msg) retryPartitionSet := sent.msgs["topic"][0] if !parent.muter.tryMute(sent) { t.Fatal("expected sent batch to mute partitions") } - parent.inFlight.Add(1) - done := make(chan struct{}) - go func() { - parent.retryBatch("topic", 0, retryPartitionSet, ErrOutOfBrokers, true) - close(done) - }() - assertDoneWithin(t, leaderLookup, 2*time.Second) - - if parent.closed.CompareAndSwap(false, true) { - close(parent.done) - } + response := new(ProduceResponse) + response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition) + bp.handleSuccess(sent, response) - producerErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrShuttingDown, producerErr.Err) - assertDoneWithin(t, done, 2*time.Second) + assertDoneWithin(t, refreshStarted, 2*time.Second) + assertNotDone(t, parent.retries, 50*time.Millisecond) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")}) - if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mute to be released after aborted retry handoff") + if parent.muter.tryMute(contender) { + parent.muter.unmute(contender) + t.Fatal("expected response retry to keep sent partition muted") } - parent.muter.unmute(contender) + + releaseRetry() + retrySet := assertDoneWithin(t, output, 2*time.Second) + defer parent.muter.unmute(retrySet) + require.Same(t, retryPartitionSet, retrySet.msgs["topic"][0]) + require.Equal(t, 1, msg.retries) } func TestRetryUsesSharedBufferBudget(t *testing.T) { tests := []struct { - name string - afterRefresh bool - bytesLimit bool + name string + bytesLimit bool }{ - {name: "retryBatch length"}, - {name: "retryBatch bytes", bytesLimit: true}, - {name: "retryBatchesAfterRefresh length", afterRefresh: true}, - {name: "retryBatchesAfterRefresh bytes", afterRefresh: true, bytesLimit: true}, + {name: "length"}, + {name: "bytes", bytesLimit: true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { config := NewTestConfig() - config.Producer.Idempotent = !tt.afterRefresh + config.Producer.Idempotent = false config.Producer.Retry.Max = 1 config.Producer.Retry.Backoff = 0 config.Producer.Return.Errors = true @@ -2537,17 +2474,13 @@ func TestRetryUsesSharedBufferBudget(t *testing.T) { } parent, retryBP := newBlockingRetryProducer(config, errorBuffer) - firstRetryReserved, releaseFirstRetry := holdFirstRetryAfterReserve(parent, retryBP, tt.afterRefresh) + firstRetryReserved, releaseFirstRetry := holdFirstRetryAfterReserve(parent, retryBP) defer releaseFirstRetry() retry := func(partition int32, pSet *partitionSet) { - if tt.afterRefresh { - retrySet := newProduceSet(parent) - retrySet.addPartitionSet("topic", partition, pSet) - parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) - return - } - parent.retryBatch("topic", partition, pSet, ErrNotEnoughReplicas, true) + retrySet := newProduceSet(parent) + retrySet.addPartitionSet("topic", partition, pSet) + parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) } first := newProduceSet(parent) @@ -2654,7 +2587,7 @@ func TestRetryBatchesAfterRefreshBrokerDone(t *testing.T) { parent.muter.unmute(contender) } -func TestRetryBatchesAfterRefreshShutdown(t *testing.T) { +func TestRetryBatchesAfterRefreshDrainsDuringProducerShutdown(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2701,18 +2634,18 @@ func TestRetryBatchesAfterRefreshShutdown(t *testing.T) { parent.retryBatchesAfterRefresh(sent, ErrOutOfBrokers) - firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) - secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) - require.Equal(t, ErrShuttingDown, firstErr.Err) - require.Equal(t, ErrShuttingDown, secondErr.Err) - require.ElementsMatch(t, []int32{0, 1}, []int32{firstErr.Msg.Partition, secondErr.Msg.Partition}) - assertNotDone(t, output, 50*time.Millisecond) + retrySet := assertDoneWithin(t, output, 2*time.Second) + require.Contains(t, retrySet.msgs["topic"], int32(0)) + require.Contains(t, retrySet.msgs["topic"], int32(1)) + assertNotDone(t, parent.errors, 50*time.Millisecond) + + parent.muter.unmute(retrySet) contender := newProduceSet(parent) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next-0")}) safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next-1")}) if !parent.muter.tryMute(contender) { - t.Fatal("expected partition mutes to be released after shutdown") + t.Fatal("expected partition mutes to be released after retried batch completion") } parent.muter.unmute(contender) } diff --git a/broker.go b/broker.go index ad74ae7da..e56d66dc5 100644 --- a/broker.go +++ b/broker.go @@ -253,7 +253,7 @@ func (b *Broker) Open(conf *Config) error { return } - Logger.Printf("Error while sending ApiVersionsRequest V3 to broker %s: %s\n", b.addr, err) + DebugLogger.Printf("Error while sending ApiVersionsRequest V3 to broker %s: %s\n", b.addr, err) // send a lower version request in case remote cluster is <= 2.4.0.0 maxVersion := int16(0) if apiVersionsResponse != nil { @@ -269,7 +269,7 @@ func (b *Broker) Open(conf *Config) error { if b.maybeCloseLocked(err) { return } - Logger.Printf("Error while sending ApiVersionsRequest V%d to broker %s: %s\n", maxVersion, b.addr, err) + DebugLogger.Printf("Error while sending ApiVersionsRequest V%d to broker %s: %s\n", maxVersion, b.addr, err) } } if apiVersionsResponse != nil { diff --git a/produce_set.go b/produce_set.go index 0fbb7097a..764a725d0 100644 --- a/produce_set.go +++ b/produce_set.go @@ -12,7 +12,7 @@ type partitionSet struct { bufferBytes int } -// shouldKeepMuted matches retryBatch's whole-batch retry rule: if any message has +// shouldKeepMuted matches the whole-batch retry rule: if any message has // exhausted retries, the batch will be failed instead of retried. func (ps *partitionSet) shouldKeepMuted(maxRetries int) bool { if len(ps.msgs) == 0 {