diff --git a/async_producer.go b/async_producer.go index c9d5f7d44..bcd7382e7 100644 --- a/async_producer.go +++ b/async_producer.go @@ -817,10 +817,8 @@ func (p *asyncProducer) backoffOrDone(retries int) bool { if backoff <= 0 { return !p.shuttingDown() } - timer := time.NewTimer(backoff) - defer timer.Stop() select { - case <-timer.C: + case <-time.After(backoff): return true case <-p.done: return false @@ -1244,7 +1242,7 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool { } func (bp *brokerProducer) shutdown() { - if bp.closed.Swap(true) { + if !bp.closed.CompareAndSwap(false, true) { return } if bp.done != nil { @@ -1476,10 +1474,6 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio produceSet := newProduceSet(p) produceSet.addPartitionSet(topic, partition, pSet) bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes) - var reservedMessages, reservedBytes int64 - defer func() { - p.retryBufferQuota.release(reservedMessages, reservedBytes) - }() muted := alreadyMuted failBatch := func(err error) { @@ -1502,7 +1496,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio failBatch(ErrProducerRetryBufferOverflow) return } - reservedMessages, reservedBytes = bufferMessages, bufferBytes + defer p.retryBufferQuota.release(bufferMessages, bufferBytes) // Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch // dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch. @@ -1532,23 +1526,11 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio } } -type partitionBatchRetry struct { - topic string - partition int32 - pSet *partitionSet -} - -func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) { - produceSet := newProduceSet(p) - produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet) - p.muter.unmute(produceSet) - p.returnErrors(batch.pSet.msgs, err) -} - -func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) { - for _, batch := range batches { - p.failMutedBatch(batch, err) - } +func (p *asyncProducer) failMutedSet(set *produceSet, err error) { + p.muter.unmute(set) + set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { + p.returnErrors(pSet.msgs, err) + }) } func newRetryBufferQuota(conf *Config) messageQuota { @@ -1577,99 +1559,88 @@ func producerMessageByteSizeVersion(conf *Config) int { return 1 } -// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError. -// Connection errors already hold partition mutes; doing the refresh in the -// response loop can block all progress for that broker while the cluster is -// unstable. Refresh once for the failed request so multi-partition batches do -// not amplify controller-fail metadata traffic. Group retry partitions by their -// refreshed leader broker, but hand them off from this retry worker so large -// clusters do not create one temporary goroutine per target broker. -// -// This is intentionally not implemented as a loop over retryBatch. retryBatch -// is the single-partition direct retry primitive; this path owns all muted -// partitions from one failed ProduceRequest and must retry, fail, reserve -// buffer, back off, and refresh metadata at that failed-request boundary. -func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) { +// retryBatchesAfterRefresh retries muted batches from one failed ProduceRequest +// after a single metadata refresh, preserving that failed-request boundary. +func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr error) { if p.shuttingDown() { - p.failMutedBatches(batches, ErrShuttingDown) + p.failMutedSet(batches, ErrShuttingDown) return } - var retryable []partitionBatchRetry - maxRetryAttempt := 0 - for _, batch := range batches { - if !batch.pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { - p.failMutedBatch(batch, retryErr) - continue + var ( + maxRetryAttempt int + bufferMessages, bufferBytes int64 + ) + retryable := newProduceSet(p) + batches.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + if !pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) { + failed := newProduceSet(p) + failed.addPartitionSet(topic, partition, pSet) + p.failMutedSet(failed, retryErr) + return } - for _, msg := range batch.pSet.msgs { + for _, msg := range pSet.msgs { msg.retries++ if msg.retries > maxRetryAttempt { maxRetryAttempt = msg.retries } } - retryable = append(retryable, batch) - } - if len(retryable) == 0 { + retryable.addPartitionSet(topic, partition, pSet) + bufferMessages += int64(len(pSet.msgs)) + bufferBytes += int64(pSet.bufferBytes) + }) + if retryable.empty() { return } - // Reserve before metadata refresh so refresh-blocked direct retries still - // count against the producer-level retry buffer budget. - var bufferMessages, bufferBytes int64 - for _, batch := range retryable { - bufferMessages += int64(len(batch.pSet.msgs)) - bufferBytes += int64(batch.pSet.bufferBytes) - } + // Reserve before waiting so pending direct retries still count against the + // producer-level retry buffer budget. if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) { - p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow) + p.failMutedSet(retryable, ErrProducerRetryBufferOverflow) return } - if p.shuttingDown() { + if !p.backoffOrDone(maxRetryAttempt) { p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) + p.failMutedSet(retryable, ErrShuttingDown) return } - if len(topics) > 0 { - if err := p.client.RefreshMetadata(topics...); err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) - } + topics := make([]string, 0, len(retryable.msgs)) + for topic := range retryable.msgs { + topics = append(topics, topic) } - if p.shuttingDown() { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) - return - } - - if !p.backoffOrDone(maxRetryAttempt) { - p.retryBufferQuota.release(bufferMessages, bufferBytes) - p.failMutedBatches(retryable, ErrShuttingDown) - return + if err := p.client.RefreshMetadata(topics...); err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) } brokerSets := make(map[*Broker]*produceSet) - for _, batch := range retryable { - if p.shuttingDown() { - batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) - p.retryBufferQuota.release(batchMessages, batchBytes) - p.failMutedBatch(batch, ErrShuttingDown) - continue - } - leader, err := p.client.Leader(batch.topic, batch.partition) + retryable.eachPartition(func(topic string, partition int32, pSet *partitionSet) { + leader, err := p.client.Leader(topic, partition) if err != nil { - Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, err) - batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes) + Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err) + batchMessages, batchBytes := int64(len(pSet.msgs)), int64(pSet.bufferBytes) p.retryBufferQuota.release(batchMessages, batchBytes) - p.failMutedBatch(batch, retryErr) - continue + failed := newProduceSet(p) + failed.addPartitionSet(topic, partition, pSet) + p.failMutedSet(failed, retryErr) + return } set := brokerSets[leader] if set == nil { set = newProduceSet(p) brokerSets[leader] = set } - set.addPartitionSet(batch.topic, batch.partition, batch.pSet) + set.addPartitionSet(topic, partition, pSet) + }) + + // handoffRetryBatch also checks shutdown, but doing it here avoids creating + // or ref-counting brokerProducers just to reject the retry. + if p.shuttingDown() { + for _, set := range brokerSets { + p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) + p.failMutedSet(set, ErrShuttingDown) + } + return } for leader, set := range brokerSets { @@ -1678,10 +1649,7 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part p.unrefBrokerProducer(leader, bp) p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes)) if !accepted { - p.muter.unmute(set) - set.eachPartition(func(_ string, _ int32, pSet *partitionSet) { - p.returnErrors(pSet.msgs, ErrShuttingDown) - }) + p.failMutedSet(set, ErrShuttingDown) } } } @@ -1689,10 +1657,16 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part // handoffRetryBatch transfers ownership of a muted retry batch to the broker // bridge. On false, the caller still owns the mute and must fail the batch. func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool { - if bp.closed.Load() { + // If shutdown is already visible, do not let select randomly choose a ready + // send on bp.output. The second select still handles shutdown racing with + // the handoff. + select { + case <-bp.done: + return false + case <-p.done: return false + default: } - select { case bp.output <- set: return true @@ -1715,10 +1689,9 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err + + var retrySet *produceSet keepMuted := make(map[string]map[int32]struct{}) - var retryTopics []string - retryTopicSeen := make(map[string]struct{}) - var retryBatches []partitionBatchRetry sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { // Connection failures have no ProduceResponse to feed back through // partitionProducer. Keep the sent batch muted and retry it asynchronously @@ -1731,23 +1704,18 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) { bp.parent.returnErrors(pSet.msgs, err) } else { + if retrySet == nil { + retrySet = newProduceSet(bp.parent) + } if keepMuted[topic] == nil { keepMuted[topic] = make(map[int32]struct{}) } keepMuted[topic][partition] = struct{}{} - if _, ok := retryTopicSeen[topic]; !ok { - retryTopicSeen[topic] = struct{}{} - retryTopics = append(retryTopics, topic) - } - retryBatches = append(retryBatches, partitionBatchRetry{ - topic: topic, - partition: partition, - pSet: pSet, - }) + retrySet.addPartitionSet(topic, partition, pSet) } }) - if len(retryBatches) > 0 { - go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err) + if retrySet != nil { + go bp.parent.retryBatchesAfterRefresh(retrySet, err) } bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) @@ -1991,7 +1959,7 @@ func (q *messageQuota) tryReserve(messages, bytes int64) bool { nextMessages := q.messages + messages nextBytes := q.bytes + bytes - if q.wouldOverflow(nextMessages, nextBytes) { + if q.overflows(nextMessages, nextBytes) { return false } q.messages = nextMessages @@ -2009,7 +1977,7 @@ func (q *messageQuota) addAndCheckOverflow(messages, bytes int64) bool { q.messages += messages q.bytes += bytes - return q.wouldOverflow(q.messages, q.bytes) + return q.overflows(q.messages, q.bytes) } func (q *messageQuota) release(messages, bytes int64) { @@ -2028,7 +1996,7 @@ func (q *messageQuota) shouldTrack(messages, bytes int64) bool { return (messages != 0 || bytes != 0) && (q.maxMessages > 0 || q.maxBytes > 0) } -func (q *messageQuota) wouldOverflow(messages, bytes int64) bool { +func (q *messageQuota) overflows(messages, bytes int64) bool { return (q.maxMessages > 0 && messages >= q.maxMessages) || (q.maxBytes > 0 && bytes >= q.maxBytes) } diff --git a/async_producer_test.go b/async_producer_test.go index b16e0bf61..bbc68976d 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1571,7 +1571,7 @@ func TestBrokerProducerShutdown(t *testing.T) { // TestBrokerProducerWaitForSpaceEmptyBufferRollover ensures forced rollovers with an empty buffer // do not deadlock waiting for responses when no partitions are muted. -func TestBrokerProducerWaitForSpaceEmptyBufferRollover(t *testing.T) { +func TestBrokerProducerWaitForSpaceEmptyRollover(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1707,9 +1707,9 @@ func holdFirstRetryAfterReserve(parent *asyncProducer, retryBP *brokerProducer, return firstRetryReserved, release } -// TestBrokerProducerWaitForSpaceRespectsExternalUnmute ensures waitForSpace does not +// TestBrokerProducerWaitForSpaceExternalUnmute ensures waitForSpace does not // deadlock when partitions are muted by another producer and are unmuted elsewhere. -func TestBrokerProducerWaitForSpaceRespectsExternalUnmute(t *testing.T) { +func TestBrokerProducerWaitForSpaceExternalUnmute(t *testing.T) { config := NewTestConfig() txnMgr := &transactionManager{ producerID: 0, @@ -1796,7 +1796,7 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) { } } -func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { +func TestBrokerProducerRunExternalUnmute(t *testing.T) { config := NewTestConfig() config.Producer.Flush.Messages = 1 parent := &asyncProducer{ @@ -1843,9 +1843,9 @@ func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) { assertDoneWithin(t, done, 2*time.Second) } -// TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks +// TestBrokerProducerWaitForSpaceAllMuted verifies that waitForSpace unblocks // when all partitions in the accumulating batch are externally muted and later unmuted. -func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { +func TestBrokerProducerWaitForSpaceAllMuted(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -1880,9 +1880,9 @@ func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) { } } -// TestPartitionMuterCloseWakesWaitUntilMuted verifies that closing the muter wakes +// TestPartitionMuterCloseWakesWait verifies that closing the muter wakes // goroutines blocked in waitUntilMuted. -func TestPartitionMuterCloseWakesWaitUntilMuted(t *testing.T) { +func TestPartitionMuterCloseWakesWait(t *testing.T) { config := NewTestConfig() parent := &asyncProducer{ conf: config, @@ -2031,7 +2031,7 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) { } } -func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { +func TestHandleErrorRetryKeepsMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2092,7 +2092,7 @@ func TestHandleErrorNonIdempotentRetryKeepsPartitionMuted(t *testing.T) { } } -func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { +func TestHandleErrorRetryFailsBatch(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2141,7 +2141,7 @@ func TestHandleErrorNonIdempotentRetryFailsWholeBatch(t *testing.T) { parent.muter.unmute(contender) } -func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T) { +func TestHandleErrorRetryAsyncRefresh(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2219,7 +2219,7 @@ func TestHandleErrorNonIdempotentRetryDoesNotWaitForMetadataRefresh(t *testing.T require.Equal(t, retryPartitionSet, retrySet.msgs["topic"][0]) } -func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *testing.T) { +func TestHandleErrorRetryRefreshOnce(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2288,7 +2288,7 @@ func TestHandleErrorNonIdempotentRetryRefreshesMetadataOncePerFailedRequest(t *t require.Same(t, retryPartitionSet1, retriedPartitions[int32(1)]) } -func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { +func TestHandleErrorRetryGroupsByLeader(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2385,7 +2385,7 @@ func TestHandleErrorNonIdempotentRetryGroupsBatchesByLeader(t *testing.T) { assertNotDone(t, outputB, 50*time.Millisecond) } -func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { +func TestHandleSuccessRetryUsesPartitionProducer(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2429,7 +2429,7 @@ func TestHandleSuccessNonIdempotentRetryUsesPartitionProducer(t *testing.T) { parent.muter.unmute(contender) } -func TestRetryBatchReleasesMuteWhenHandoffAbortedByShutdown(t *testing.T) { +func TestRetryBatchShutdownReleasesMute(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2542,11 +2542,9 @@ func TestRetryUsesSharedBufferBudget(t *testing.T) { retry := func(partition int32, pSet *partitionSet) { if tt.afterRefresh { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: partition, - pSet: pSet, - }}, ErrOutOfBrokers) + retrySet := newProduceSet(parent) + retrySet.addPartitionSet("topic", partition, pSet) + parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) return } parent.retryBatch("topic", partition, pSet, ErrNotEnoughReplicas, true) @@ -2600,7 +2598,7 @@ func TestRetryUsesSharedBufferBudget(t *testing.T) { } } -func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T) { +func TestRetryBatchesAfterRefreshBrokerDone(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 @@ -2637,11 +2635,9 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T done := make(chan struct{}) go func() { - parent.retryBatchesAfterRefresh([]string{"topic"}, []partitionBatchRetry{{ - topic: "topic", - partition: 0, - pSet: retryPartitionSet, - }}, ErrOutOfBrokers) + retrySet := newProduceSet(parent) + retrySet.addPartitionSet("topic", 0, retryPartitionSet) + parent.retryBatchesAfterRefresh(retrySet, ErrOutOfBrokers) close(done) }() @@ -2658,7 +2654,70 @@ func TestRetryBatchesAfterRefreshReleasesMuteWhenBrokerProducerDone(t *testing.T parent.muter.unmute(contender) } -func TestHandleErrorNonIdempotentRetryReleasesMuteOnLeaderError(t *testing.T) { +func TestRetryBatchesAfterRefreshShutdown(t *testing.T) { + config := NewTestConfig() + config.Producer.Idempotent = false + config.Producer.Retry.Max = 1 + config.Producer.Retry.Backoff = 0 + config.Producer.Return.Errors = true + + parent := &asyncProducer{ + conf: config, + muter: newPartitionMuter(), + brokers: make(map[*Broker]*brokerProducer), + brokerRefs: make(map[*brokerProducer]int), + errors: make(chan *ProducerError, 2), + done: make(chan struct{}), + txnmgr: &transactionManager{}, + } + leader := &Broker{id: 1} + output := make(chan *produceSet, 1) + parent.brokers[leader] = &brokerProducer{ + parent: parent, + broker: leader, + output: output, + input: make(chan *ProducerMessage), + done: make(chan struct{}), + } + shutdownStarted := false + parent.client = &stubLeaderClient{ + cfg: config, + leaderFunc: func(string, int32) (*Broker, error) { + if !shutdownStarted { + close(parent.done) + shutdownStarted = true + } + return leader, nil + }, + } + + sent := newProduceSet(parent) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry-0")}) + safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("retry-1")}) + if !parent.muter.tryMute(sent) { + t.Fatal("expected sent batch to mute partitions") + } + parent.inFlight.Add(2) + + parent.retryBatchesAfterRefresh(sent, ErrOutOfBrokers) + + firstErr := assertDoneWithin(t, parent.errors, 2*time.Second) + secondErr := assertDoneWithin(t, parent.errors, 2*time.Second) + require.Equal(t, ErrShuttingDown, firstErr.Err) + require.Equal(t, ErrShuttingDown, secondErr.Err) + require.ElementsMatch(t, []int32{0, 1}, []int32{firstErr.Msg.Partition, secondErr.Msg.Partition}) + assertNotDone(t, output, 50*time.Millisecond) + + contender := newProduceSet(parent) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next-0")}) + safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 1, Value: StringEncoder("next-1")}) + if !parent.muter.tryMute(contender) { + t.Fatal("expected partition mutes to be released after shutdown") + } + parent.muter.unmute(contender) +} + +func TestHandleErrorRetryLeaderError(t *testing.T) { config := NewTestConfig() config.Producer.Idempotent = false config.Producer.Retry.Max = 1 diff --git a/client.go b/client.go index 71b57b15c..6dbf0d009 100644 --- a/client.go +++ b/client.go @@ -732,7 +732,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { for _, broker := range brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + DebugLogger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) } } @@ -741,7 +741,7 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) { func (client *client) checkBrokersHealth() { for id, broker := range client.brokers { if err := broker.getSockError(); err != nil { - Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + DebugLogger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) delete(client.brokers, id) }