Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 147 additions & 75 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/eapache/go-resiliency/breaker"
Expand Down Expand Up @@ -92,6 +93,10 @@ type asyncProducer struct {
errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup
// shutdownCh is closed before waiting on in-flight messages so retryBatch
// goroutines can stop waiting on broker handoff and release partition mutes.
shutdownCh chan struct{}
shutdownChClosed atomic.Bool

brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
Expand Down Expand Up @@ -184,13 +189,6 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool {
return true
}

func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.isMuted(topic, partition)
}

// waitUntilMuted blocks until all partitions in the set can be muted, then mutes them.
// Returns false if the muter was closed before all partitions could be muted.
func (m *partitionMuter) waitUntilMuted(set *produceSet) bool {
Expand Down Expand Up @@ -310,6 +308,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
shutdownCh: make(chan struct{}),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
Expand Down Expand Up @@ -383,11 +382,6 @@ type ProducerMessage struct {
sequenceNumber int32
producerEpoch int16
hasSequence bool
// reusePartitionMute is a one-shot marker for non-idempotent retries. The
// first retried message from a failed, still-muted batch may reuse that
// existing in-flight reservation so later same-partition messages cannot
// slip in between the failed send and its retry.
reusePartitionMute bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -414,7 +408,6 @@ func (m *ProducerMessage) ByteSize(version int) int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.reusePartitionMute = false
m.sequenceNumber = 0
m.producerEpoch = 0
m.hasSequence = false
Expand Down Expand Up @@ -794,12 +787,16 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
}

func (pp *partitionProducer) backoff(retries int) {
pp.parent.backoff(retries)
}

func (p *asyncProducer) backoff(retries int) {
var backoff time.Duration
if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
maxRetries := pp.parent.conf.Producer.Retry.Max
backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
if p.conf.Producer.Retry.BackoffFunc != nil {
maxRetries := p.conf.Producer.Retry.Max
backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
} else {
backoff = pp.parent.conf.Producer.Retry.Backoff
backoff = p.conf.Producer.Retry.Backoff
}
if backoff > 0 {
time.Sleep(backoff)
Expand Down Expand Up @@ -969,6 +966,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
done: make(chan struct{}),
accumulatingBatch: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
Expand Down Expand Up @@ -1082,6 +1080,11 @@ type brokerProducer struct {
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
// done is closed before output is closed. Direct retry handoff waits on
// output while the batch still owns its partition mute; if the brokerProducer
// is shutting down, that send may never be received and the mute would block
// later same-partition messages and producer shutdown.
done chan struct{}

accumulatingBatch *produceSet
flushingBatch *produceSet // batch that has been muted and is ready to send
Expand Down Expand Up @@ -1214,28 +1217,13 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool {
return true
}

reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool {
partitions := bp.accumulatingBatch.msgs[topic]
if partitions == nil {
return false
}
set := partitions[partition]
return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition)
})
if reused == nil {
return false
}
reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) {
pSet.clearPartitionMuteReuse()
})
bp.flushingBatch = reused
if bp.accumulatingBatch.empty() {
bp.rollOver()
}
return true
return false
}

func (bp *brokerProducer) shutdown() {
if bp.done != nil {
close(bp.done)
}
// flush any ready buffer
for bp.flushingBatch != nil {
select {
Expand Down Expand Up @@ -1393,7 +1381,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnErrors(pSet.msgs, block.Err)
} else {
retryTopics = append(retryTopics, topic)
if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) {
if bp.parent.conf.Producer.Idempotent {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
Expand Down Expand Up @@ -1436,7 +1424,9 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, block.Err, true)
} else {
pSet.markPartitionMuteReusable()
// Non-idempotent response retries must go back through partitionProducer.
// That path advances the retry high watermark and refreshes the partition
// leader before sending the retry; retryBatch would bypass that state.
bp.parent.retryMessages(pSet.msgs, block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
Expand All @@ -1462,40 +1452,119 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
muted := alreadyMuted
failBatch := func(err error) {
// Release the partition before reporting errors so a blocked Errors
// consumer cannot keep later same-partition messages muted.
if muted {
p.muter.unmute(produceSet)
muted = false
}
for _, msg := range pSet.msgs {
p.returnError(msg, err)
}
}
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnErrors(pSet.msgs, retryErr)
if alreadyMuted {
p.muter.unmute(produceSet)
}
failBatch(retryErr)
return
}
msg.retries++
}

// honor Producer.Retry.Backoff between retry attempts (#2469); the
// non-idempotent path gets this from partitionProducer.dispatch, but
// retryBatch dispatches the produceSet directly to the broker
if len(pSet.msgs) > 0 {
p.backoff(pSet.msgs[0].retries)
}

// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, leaderErr := p.client.Leader(topic, partition)
if leaderErr != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, leaderErr)
for _, msg := range pSet.msgs {
p.returnError(msg, retryErr)
}
if alreadyMuted {
p.muter.unmute(produceSet)
}
failBatch(retryErr)
return
}
if !alreadyMuted {
if !p.muter.waitUntilMuted(produceSet) {
for _, msg := range pSet.msgs {
p.returnError(msg, retryErr)
}
failBatch(retryErr)
return
}
muted = true
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
defer p.unrefBrokerProducer(leader, bp)
if !p.sendRetryBatch(bp, produceSet) {
failBatch(ErrShuttingDown)
return
}
}

type partitionBatchRetry struct {
topic string
partition int32
pSet *partitionSet
}

// retryBatchesAfterRefresh keeps metadata refresh out of brokerProducer.handleError.
// Connection errors already hold partition mutes; doing the refresh in the
// response loop can block all progress for that broker while the cluster is
// unstable. Refresh once for the failed request so multi-partition batches do
// not amplify controller-fail metadata traffic, then retry partitions
// independently so one blocked broker handoff cannot hold up the rest.
func (p *asyncProducer) retryBatchesAfterRefresh(topics []string, batches []partitionBatchRetry, retryErr error) {
if p.shutdownCh != nil {
select {
case <-p.shutdownCh:
for _, batch := range batches {
go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true)
}
return
default:
}
}
if len(topics) > 0 {
if err := p.client.RefreshMetadata(topics...); err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}
}
for _, batch := range batches {
go p.retryBatch(batch.topic, batch.partition, batch.pSet, retryErr, true)
}
}

// sendRetryBatch transfers ownership of a muted retry batch to the broker
// bridge. A false result means no owner accepted the batch, so the caller must
// release the mute and fail the messages instead of leaving a retry goroutine
// blocked while it still owns the partition mute.
func (p *asyncProducer) sendRetryBatch(bp *brokerProducer, set *produceSet) bool {
var done <-chan struct{}
if bp.done != nil {
done = bp.done
}
select {
case <-done:
return false
default:
}
select {
case bp.output <- set:
return true
default:
}
var shutdownCh <-chan struct{}
if p.shutdownCh != nil {
shutdownCh = p.shutdownCh
}
select {
case bp.output <- set:
return true
case <-done:
return false
case <-shutdownCh:
return false
}
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
Expand All @@ -1510,41 +1579,40 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err
keepMuted := make(map[string]map[int32]struct{})
var retryTopics []string
retryTopicSeen := make(map[string]struct{})
var retryBatches []partitionBatchRetry
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
if _, ok := retryTopicSeen[topic]; ok {
return
}
retryTopicSeen[topic] = struct{}{}
retryTopics = append(retryTopics, topic)
})
if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 {
refreshErr := bp.parent.client.RefreshMetadata(retryTopics...)
if refreshErr != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr)
}
}
keepMuted := make(map[string]map[int32]struct{})
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
// keep partition marked as in-flight during retry (connection error)
// Connection failures have no ProduceResponse to feed back through
// partitionProducer. Keep the sent batch muted and retry it asynchronously
// so later same-partition batches cannot pass it, without blocking the
// brokerProducer response loop (#1203).
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = err
if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) {
if !(bp.parent.conf.Producer.Idempotent || pSet.shouldKeepMuted(bp.parent.conf.Producer.Retry.Max)) {
bp.parent.returnErrors(pSet.msgs, err)
} else {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
}
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, err, true)
} else {
pSet.markPartitionMuteReusable()
bp.parent.retryMessages(pSet.msgs, err)
if _, ok := retryTopicSeen[topic]; !ok {
retryTopicSeen[topic] = struct{}{}
retryTopics = append(retryTopics, topic)
}
retryBatches = append(retryBatches, partitionBatchRetry{
topic: topic,
partition: partition,
pSet: pSet,
})
}
})
if len(retryBatches) > 0 {
go bp.parent.retryBatchesAfterRefresh(retryTopics, retryBatches, err)
}
bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
})
Expand Down Expand Up @@ -1626,6 +1694,10 @@ func (p *asyncProducer) retryHandler() {
// utility functions

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
if p.shutdownCh != nil && p.shutdownChClosed.CompareAndSwap(false, true) {
close(p.shutdownCh)
}
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}

Expand Down
Loading
Loading