Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 91 additions & 48 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/eapache/go-resiliency/breaker"
Expand Down Expand Up @@ -93,6 +94,9 @@ type asyncProducer struct {
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

done chan struct{}
closed atomic.Bool

brokers map[*Broker]*brokerProducer
brokerRefs map[*brokerProducer]int
brokerLock sync.Mutex
Expand Down Expand Up @@ -303,6 +307,7 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
done: make(chan struct{}),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
Expand Down Expand Up @@ -781,12 +786,16 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
}

func (pp *partitionProducer) backoff(retries int) {
pp.parent.retryBackoff(retries)
}

func (p *asyncProducer) retryBackoff(retries int) {
var backoff time.Duration
if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
maxRetries := pp.parent.conf.Producer.Retry.Max
backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
if p.conf.Producer.Retry.BackoffFunc != nil {
maxRetries := p.conf.Producer.Retry.Max
backoff = p.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
} else {
backoff = pp.parent.conf.Producer.Retry.Backoff
backoff = p.conf.Producer.Retry.Backoff
}
if backoff > 0 {
time.Sleep(backoff)
Expand Down Expand Up @@ -821,12 +830,19 @@ func (pp *partitionProducer) dispatch() {
}()

for msg := range pp.input {
if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
if pp.brokerProducer != nil {
select {
case <-pp.brokerProducer.abandoned:
// a message on the abandoned channel means that our current broker selection is out of date
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
abandonedLeader := pp.leader
pp.parent.unrefBrokerProducer(abandonedLeader, pp.brokerProducer)
pp.brokerProducer = nil
if leader, _ := pp.parent.client.Leader(pp.topic, pp.partition); leader != nil && leader.ID() != abandonedLeader.ID() {
pp.leader = leader
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
default:
// producer connection is still open.
Expand Down Expand Up @@ -956,6 +972,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
abandoned: make(chan struct{}),
accumulatingBatch: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
Expand Down Expand Up @@ -1046,10 +1063,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
}
})

if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}

return bp
}

Expand Down Expand Up @@ -1092,10 +1105,16 @@ func (bp *brokerProducer) run() {
timerChan = bp.timer.C
}

var unmuteCh <-chan struct{}
if bp.flushingBatch != nil {
output = bp.output
} else {
output = nil
if bp.accumulatingBatch.readyToFlush() {
if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked {
unmuteCh = ch
}
}
}

select {
Expand Down Expand Up @@ -1138,7 +1157,6 @@ func (bp *brokerProducer) run() {
}

if bp.accumulatingBatch.wouldOverflow(msg) {
DebugLogger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
Expand Down Expand Up @@ -1169,6 +1187,7 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-unmuteCh:
}
}
}
Expand Down Expand Up @@ -1350,32 +1369,28 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.abandonBrokerConnection(bp)
bp.parent.returnErrors(pSet.msgs, block.Err)
} else {
retryTopics = append(retryTopics, topic)
if bp.parent.conf.Producer.Idempotent {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
}
// Other non-retriable errors
default:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.abandonBrokerConnection(bp)
}
bp.parent.returnErrors(pSet.msgs, block.Err)
}
})

if len(retryTopics) > 0 {
if bp.parent.conf.Producer.Idempotent {
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}
err := bp.parent.client.RefreshMetadata(retryTopics...)
if err != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", err)
}

sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
Expand All @@ -1390,17 +1405,25 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
leaderChanged := false
if !bp.parent.conf.Producer.Idempotent {
leader, leaderErr := bp.parent.client.Leader(topic, partition)
leaderChanged = leaderErr == nil && leader != nil && leader.ID() != bp.broker.ID()
}
bp.currentRetries[topic][partition] = block.Err
if bp.parent.conf.Producer.Idempotent || leaderChanged {
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
}
if leaderChanged {
bp.parent.abandonBrokerConnection(bp)
}
go bp.parent.retryBatch(topic, partition, pSet, block.Err, true)
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, block.Err, true)
} else {
bp.parent.retryMessages(pSet.msgs, block.Err)
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.accumulatingBatch.dropPartition(topic, partition), block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.accumulatingBatch.dropPartition(topic, partition), block.Err)
}
})
}
Expand All @@ -1422,6 +1445,7 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
retryAttempt := 0
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnErrors(pSet.msgs, retryErr)
Expand All @@ -1431,6 +1455,10 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
return
}
msg.retries++
retryAttempt = msg.retries
}
if !p.conf.Producer.Idempotent {
p.retryBackoff(retryAttempt)
}

// it's expected that a metadata refresh has been requested prior to calling retryBatch
Expand All @@ -1454,8 +1482,23 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
}
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
defer p.unrefBrokerProducer(leader, bp)

select {
case bp.output <- produceSet:
return
default:
}

select {
case bp.output <- produceSet:
return
case <-p.done:
for _, msg := range pSet.msgs {
p.returnError(msg, ErrShuttingDown)
}
p.muter.unmute(produceSet)
}
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
Expand All @@ -1467,7 +1510,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.parent.muter.unmute(sent)
} else {
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.abandonBrokerConnection(bp)
_ = bp.broker.Close()
bp.closing = err
var retryTopics []string
Expand All @@ -1479,7 +1522,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
retryTopicSeen[topic] = struct{}{}
retryTopics = append(retryTopics, topic)
})
if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 {
if len(retryTopics) > 0 {
refreshErr := bp.parent.client.RefreshMetadata(retryTopics...)
if refreshErr != nil {
Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr)
Expand All @@ -1492,15 +1535,11 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = err
if bp.parent.conf.Producer.Idempotent {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
go bp.parent.retryBatch(topic, partition, pSet, err, true)
} else {
bp.parent.retryMessages(pSet.msgs, err)
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
go bp.parent.retryBatch(topic, partition, pSet, err, true)
})
bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
Expand Down Expand Up @@ -1583,6 +1622,9 @@ func (p *asyncProducer) retryHandler() {
// utility functions

func (p *asyncProducer) shutdown() {
if p.closed.CompareAndSwap(false, true) {
close(p.done)
}
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}

Expand Down Expand Up @@ -1716,14 +1758,15 @@ func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer)
}
}

func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
func (p *asyncProducer) abandonBrokerConnection(bp *brokerProducer) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

bc, ok := p.brokers[broker]
if ok && bc.abandoned != nil {
close(bc.abandoned)
bc, ok := p.brokers[bp.broker]
if !ok || bc != bp {
return
}

delete(p.brokers, broker)
close(bc.abandoned)
delete(p.brokers, bp.broker)
}
Loading
Loading