Skip to content
Merged
188 changes: 78 additions & 110 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,8 @@ func (p *asyncProducer) backoffOrDone(retries int) bool {
if backoff <= 0 {
return !p.shuttingDown()
}
timer := time.NewTimer(backoff)
defer timer.Stop()
select {
case <-timer.C:
case <-time.After(backoff):
return true
case <-p.done:
return false
Expand Down Expand Up @@ -1244,7 +1242,7 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool {
}

func (bp *brokerProducer) shutdown() {
if bp.closed.Swap(true) {
if !bp.closed.CompareAndSwap(false, true) {
return
}
if bp.done != nil {
Expand Down Expand Up @@ -1476,10 +1474,6 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
produceSet := newProduceSet(p)
produceSet.addPartitionSet(topic, partition, pSet)
bufferMessages, bufferBytes := int64(produceSet.bufferCount), int64(produceSet.bufferBytes)
var reservedMessages, reservedBytes int64
defer func() {
p.retryBufferQuota.release(reservedMessages, reservedBytes)
}()

muted := alreadyMuted
failBatch := func(err error) {
Expand All @@ -1502,7 +1496,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
failBatch(ErrProducerRetryBufferOverflow)
return
}
reservedMessages, reservedBytes = bufferMessages, bufferBytes
defer p.retryBufferQuota.release(bufferMessages, bufferBytes)

// Honor Producer.Retry.Backoff between retry attempts (#2469). retryBatch
// dispatches the produceSet directly to the broker, bypassing partitionProducer.dispatch.
Expand Down Expand Up @@ -1532,23 +1526,11 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
}
}

type partitionBatchRetry struct {
topic string
partition int32
pSet *partitionSet
}

func (p *asyncProducer) failMutedBatch(batch partitionBatchRetry, err error) {
produceSet := newProduceSet(p)
produceSet.addPartitionSet(batch.topic, batch.partition, batch.pSet)
p.muter.unmute(produceSet)
p.returnErrors(batch.pSet.msgs, err)
}

func (p *asyncProducer) failMutedBatches(batches []partitionBatchRetry, err error) {
for _, batch := range batches {
p.failMutedBatch(batch, err)
}
func (p *asyncProducer) failMutedSet(set *produceSet, err error) {
p.muter.unmute(set)
set.eachPartition(func(_ string, _ int32, pSet *partitionSet) {
p.returnErrors(pSet.msgs, err)
})
}

func newRetryBufferQuota(conf *Config) messageQuota {
Expand Down Expand Up @@ -1577,99 +1559,88 @@ func producerMessageByteSizeVersion(conf *Config) int {
return 1
}

// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError.
// Connection errors already hold partition mutes; doing the refresh in the
// response loop can block all progress for that broker while the cluster is
// unstable. Refresh once for the failed request so multi-partition batches do
// not amplify controller-fail metadata traffic. Group retry partitions by their
// refreshed leader broker, but hand them off from this retry worker so large
// clusters do not create one temporary goroutine per target broker.
//
// This is intentionally not implemented as a loop over retryBatch. retryBatch
// is the single-partition direct retry primitive; this path owns all muted
// partitions from one failed ProduceRequest and must retry, fail, reserve
// buffer, back off, and refresh metadata at that failed-request boundary.
func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) {
// retryBatchesAfterRefresh retries muted batches from one failed ProduceRequest
// after a single metadata refresh, preserving that failed-request boundary.
func (p *asyncProducer) retryBatchesAfterRefresh(batches *produceSet, retryErr error) {
if p.shuttingDown() {
p.failMutedBatches(batches, ErrShuttingDown)
p.failMutedSet(batches, ErrShuttingDown)
return
}

var retryable []partitionBatchRetry
maxRetryAttempt := 0
for _, batch := range batches {
if !batch.pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) {
p.failMutedBatch(batch, retryErr)
continue
var (
maxRetryAttempt int
bufferMessages, bufferBytes int64
)
retryable := newProduceSet(p)
batches.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
if !pSet.shouldKeepMuted(p.conf.Producer.Retry.Max) {
failed := newProduceSet(p)
failed.addPartitionSet(topic, partition, pSet)
p.failMutedSet(failed, retryErr)
return
}
for _, msg := range batch.pSet.msgs {
for _, msg := range pSet.msgs {
msg.retries++
if msg.retries > maxRetryAttempt {
maxRetryAttempt = msg.retries
}
}
retryable = append(retryable, batch)
}
if len(retryable) == 0 {
retryable.addPartitionSet(topic, partition, pSet)
bufferMessages += int64(len(pSet.msgs))
bufferBytes += int64(pSet.bufferBytes)
})
if retryable.empty() {
return
}

// Reserve before metadata refresh so refresh-blocked direct retries still
// count against the producer-level retry buffer budget.
var bufferMessages, bufferBytes int64
for _, batch := range retryable {
bufferMessages += int64(len(batch.pSet.msgs))
bufferBytes += int64(batch.pSet.bufferBytes)
}
// Reserve before waiting so pending direct retries still count against the
// producer-level retry buffer budget.
if !p.retryBufferQuota.tryReserve(bufferMessages, bufferBytes) {
p.failMutedBatches(retryable, ErrProducerRetryBufferOverflow)
p.failMutedSet(retryable, ErrProducerRetryBufferOverflow)
return
}

if p.shuttingDown() {
if !p.backoffOrDone(maxRetryAttempt) {
p.retryBufferQuota.release(bufferMessages, bufferBytes)
p.failMutedBatches(retryable, ErrShuttingDown)
p.failMutedSet(retryable, ErrShuttingDown)
return
}
if len(topics) > 0 {
if err := p.client.RefreshMetadata(topics...); err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}
topics := make([]string, 0, len(retryable.msgs))
for topic := range retryable.msgs {
topics = append(topics, topic)
}
if p.shuttingDown() {
p.retryBufferQuota.release(bufferMessages, bufferBytes)
p.failMutedBatches(retryable, ErrShuttingDown)
return
}

if !p.backoffOrDone(maxRetryAttempt) {
p.retryBufferQuota.release(bufferMessages, bufferBytes)
p.failMutedBatches(retryable, ErrShuttingDown)
return
if err := p.client.RefreshMetadata(topics...); err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}

brokerSets := make(map[*Broker]*produceSet)
for _, batch := range retryable {
if p.shuttingDown() {
batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes)
p.retryBufferQuota.release(batchMessages, batchBytes)
p.failMutedBatch(batch, ErrShuttingDown)
continue
}
leader, err := p.client.Leader(batch.topic, batch.partition)
retryable.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", batch.topic, batch.partition, err)
batchMessages, batchBytes := int64(len(batch.pSet.msgs)), int64(batch.pSet.bufferBytes)
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
batchMessages, batchBytes := int64(len(pSet.msgs)), int64(pSet.bufferBytes)
p.retryBufferQuota.release(batchMessages, batchBytes)
p.failMutedBatch(batch, retryErr)
continue
failed := newProduceSet(p)
failed.addPartitionSet(topic, partition, pSet)
p.failMutedSet(failed, retryErr)
return
}
set := brokerSets[leader]
if set == nil {
set = newProduceSet(p)
brokerSets[leader] = set
}
set.addPartitionSet(batch.topic, batch.partition, batch.pSet)
set.addPartitionSet(topic, partition, pSet)
})

// handoffRetryBatch also checks shutdown, but doing it here avoids creating
// or ref-counting brokerProducers just to reject the retry.
if p.shuttingDown() {
for _, set := range brokerSets {
p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes))
p.failMutedSet(set, ErrShuttingDown)
}
return
}

for leader, set := range brokerSets {
Expand All @@ -1678,21 +1649,24 @@ func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []part
p.unrefBrokerProducer(leader, bp)
p.retryBufferQuota.release(int64(set.bufferCount), int64(set.bufferBytes))
if !accepted {
p.muter.unmute(set)
set.eachPartition(func(_ string, _ int32, pSet *partitionSet) {
p.returnErrors(pSet.msgs, ErrShuttingDown)
})
p.failMutedSet(set, ErrShuttingDown)
}
}
}

// handoffRetryBatch transfers ownership of a muted retry batch to the broker
// bridge. On false, the caller still owns the mute and must fail the batch.
func (p *asyncProducer) handoffRetryBatch(bp *brokerProducer, set *produceSet) bool {
if bp.closed.Load() {
// If shutdown is already visible, do not let select randomly choose a ready
// send on bp.output. The second select still handles shutdown racing with
// the handoff.
select {
case <-bp.done:
return false
case <-p.done:
return false
default:
}

select {
case bp.output <- set:
return true
Expand All @@ -1715,10 +1689,9 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err

var retrySet *produceSet
keepMuted := make(map[string]map[int32]struct{})
var retryTopics []string
retryTopicSeen := make(map[string]struct{})
var retryBatches []partitionBatchRetry
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
// Connection failures have no ProduceResponse to feed back through
// partitionProducer. Keep the sent batch muted and retry it asynchronously
Expand All @@ -1731,23 +1704,18 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) {
bp.parent.returnErrors(pSet.msgs, err)
} else {
if retrySet == nil {
retrySet = newProduceSet(bp.parent)
}
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
if _, ok := retryTopicSeen[topic]; !ok {
retryTopicSeen[topic] = struct{}{}
retryTopics = append(retryTopics, topic)
}
retryBatches = append(retryBatches, partitionBatchRetry{
topic: topic,
partition: partition,
pSet: pSet,
})
retrySet.addPartitionSet(topic, partition, pSet)
}
})
if len(retryBatches) > 0 {
go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err)
if retrySet != nil {
go bp.parent.retryBatchesAfterRefresh(retrySet, err)
}
bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
Expand Down Expand Up @@ -1991,7 +1959,7 @@ func (q *messageQuota) tryReserve(messages, bytes int64) bool {

nextMessages := q.messages + messages
nextBytes := q.bytes + bytes
if q.wouldOverflow(nextMessages, nextBytes) {
if q.overflows(nextMessages, nextBytes) {
return false
}
q.messages = nextMessages
Expand All @@ -2009,7 +1977,7 @@ func (q *messageQuota) addAndCheckOverflow(messages, bytes int64) bool {

q.messages += messages
q.bytes += bytes
return q.wouldOverflow(q.messages, q.bytes)
return q.overflows(q.messages, q.bytes)
}

func (q *messageQuota) release(messages, bytes int64) {
Expand All @@ -2028,7 +1996,7 @@ func (q *messageQuota) shouldTrack(messages, bytes int64) bool {
return (messages != 0 || bytes != 0) && (q.maxMessages > 0 || q.maxBytes > 0)
}

func (q *messageQuota) wouldOverflow(messages, bytes int64) bool {
func (q *messageQuota) overflows(messages, bytes int64) bool {
return (q.maxMessages > 0 && messages >= q.maxMessages) ||
(q.maxBytes > 0 && bytes >= q.maxBytes)
}
Loading
Loading