Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ func (m *partitionMuter) tryMutePartition(topic string, partition int32) bool {
return true
}

func (m *partitionMuter) isMutedPartition(topic string, partition int32) bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.isMuted(topic, partition)
}

// waitUntilMuted blocks until all partitions in the set can be muted, then mutes them.
// Returns false if the muter was closed before all partitions could be muted.
func (m *partitionMuter) waitUntilMuted(set *produceSet) bool {
Expand Down Expand Up @@ -376,6 +383,11 @@ type ProducerMessage struct {
sequenceNumber int32
producerEpoch int16
hasSequence bool
// reusePartitionMute is a one-shot marker for non-idempotent retries. The
// first retried message from a failed, still-muted batch may reuse that
// existing in-flight reservation so later same-partition messages cannot
// slip in between the failed send and its retry.
reusePartitionMute bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -402,6 +414,7 @@ func (m *ProducerMessage) ByteSize(version int) int {
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
m.reusePartitionMute = false
m.sequenceNumber = 0
m.producerEpoch = 0
m.hasSequence = false
Expand Down Expand Up @@ -1083,8 +1096,14 @@ func (bp *brokerProducer) run() {
var output chan<- *produceSet

for {
var unmuteCh <-chan struct{}
if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) {
bp.tryBuildFlushingBatch()
if bp.flushingBatch == nil {
if ch, blocked := bp.parent.muter.awaitUnmuteChan(bp.accumulatingBatch); blocked {
unmuteCh = ch
}
}
}

var timerChan <-chan time.Time
Expand Down Expand Up @@ -1169,6 +1188,7 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-unmuteCh:
}
}
}
Expand All @@ -1186,10 +1206,29 @@ func (bp *brokerProducer) tryBuildFlushingBatch() bool {
partial := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool {
return bp.parent.muter.tryMutePartition(topic, partition)
})
if partial == nil {
if partial != nil {
bp.flushingBatch = partial
if bp.accumulatingBatch.empty() {
bp.rollOver()
}
return true
}

reused := bp.accumulatingBatch.takePartitions(func(topic string, partition int32) bool {
partitions := bp.accumulatingBatch.msgs[topic]
if partitions == nil {
return false
}
set := partitions[partition]
return set.canReusePartitionMute() && bp.parent.muter.isMutedPartition(topic, partition)
})
if reused == nil {
return false
}
bp.flushingBatch = partial
reused.eachPartition(func(_ string, _ int32, pSet *partitionSet) {
pSet.clearPartitionMuteReuse()
})
bp.flushingBatch = reused
if bp.accumulatingBatch.empty() {
bp.rollOver()
}
Expand Down Expand Up @@ -1354,7 +1393,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnErrors(pSet.msgs, block.Err)
} else {
retryTopics = append(retryTopics, topic)
if bp.parent.conf.Producer.Idempotent {
if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
Expand Down Expand Up @@ -1397,6 +1436,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, block.Err, true)
} else {
pSet.markPartitionMuteReusable()
bp.parent.retryMessages(pSet.msgs, block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
Expand Down Expand Up @@ -1492,13 +1532,16 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = err
if bp.parent.conf.Producer.Idempotent {
if bp.parent.conf.Producer.Idempotent || pSet.canRetry(bp.parent.conf.Producer.Retry.Max) {
if keepMuted[topic] == nil {
keepMuted[topic] = make(map[int32]struct{})
}
keepMuted[topic][partition] = struct{}{}
}
if bp.parent.conf.Producer.Idempotent {
go bp.parent.retryBatch(topic, partition, pSet, err, true)
} else {
pSet.markPartitionMuteReusable()
bp.parent.retryMessages(pSet.msgs, err)
}
})
Expand Down
110 changes: 110 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,53 @@ func TestBrokerProducerFlushSkipsMutedPartitions(t *testing.T) {
}
}

func TestBrokerProducerRunFlushesAfterExternalUnmute(t *testing.T) {
config := NewTestConfig()
config.Producer.Flush.Messages = 1
parent := &asyncProducer{
conf: config,
muter: newPartitionMuter(),
txnmgr: &transactionManager{},
}

blockedSet := newProduceSet(parent)
safeAddMessage(t, blockedSet, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("held")})
if !parent.muter.tryMute(blockedSet) {
t.Fatal("expected to mute partition externally")
}

input := make(chan *ProducerMessage)
output := make(chan *produceSet, 1)
responses := make(chan *brokerProducerResponse)
bp := &brokerProducer{
parent: parent,
broker: &Broker{id: 1},
input: input,
output: output,
responses: responses,
accumulatingBatch: newProduceSet(parent),
currentRetries: make(map[string]map[int32]error),
}
retryMsg := &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("waiting")}
safeAddMessage(t, bp.accumulatingBatch, retryMsg)

done := make(chan struct{})
go func() {
bp.run()
close(done)
}()

assertNotDone(t, output, 50*time.Millisecond)
parent.muter.unmute(blockedSet)

flushed := assertDoneWithin(t, output, 2*time.Second)
require.Equal(t, retryMsg, flushed.msgs["topic"][0].msgs[0])

close(input)
close(responses)
assertDoneWithin(t, done, 2*time.Second)
}

// TestBrokerProducerWaitForSpaceAllPartitionsMuted verifies that waitForSpace unblocks
// when all partitions in the accumulating batch are externally muted and later unmuted.
func TestBrokerProducerWaitForSpaceAllPartitionsMuted(t *testing.T) {
Expand Down Expand Up @@ -1910,6 +1957,69 @@ func TestRetryBatchRespectsPartitionMuter(t *testing.T) {
}
}

func TestHandleSuccessNonIdempotentRetryKeepsPartitionMuted(t *testing.T) {
config := NewTestConfig()
config.Producer.Idempotent = false
config.Producer.Retry.Max = 1
config.Producer.Retry.Backoff = 0

txnMgr := &transactionManager{
producerID: 0,
producerEpoch: 0,
sequenceNumbers: make(map[string]int32),
}
parent := &asyncProducer{
conf: config,
muter: newPartitionMuter(),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
retries: make(chan *ProducerMessage, 1),
txnmgr: txnMgr,
}
leader := &Broker{id: 1}
parent.client = &stubLeaderClient{leader: leader, cfg: config}

bp := &brokerProducer{
parent: parent,
broker: leader,
input: make(chan *ProducerMessage),
accumulatingBatch: newProduceSet(parent),
currentRetries: make(map[string]map[int32]error),
}
parent.brokers[leader] = bp
parent.brokerRefs[bp] = 1

sent := newProduceSet(parent)
safeAddMessage(t, sent, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("retry")})
retryPartitionSet := sent.msgs["topic"][0]
if !parent.muter.tryMute(sent) {
t.Fatal("expected sent batch to mute partitions")
}
defer parent.muter.unmute(sent)

response := new(ProduceResponse)
response.AddTopicPartition("topic", 0, ErrNotLeaderForPartition)
bp.handleSuccess(sent, response)

retried := assertDoneWithin(t, parent.retries, 2*time.Second)
require.Equal(t, retryPartitionSet.msgs[0], retried)
require.True(t, retried.reusePartitionMute)

contender := newProduceSet(parent)
safeAddMessage(t, contender, &ProducerMessage{Topic: "topic", Partition: 0, Value: StringEncoder("next")})
if parent.muter.tryMute(contender) {
parent.muter.unmute(contender)
t.Fatal("expected partition to remain muted by the retrying batch")
}

safeAddMessage(t, bp.accumulatingBatch, retried)
if !bp.tryBuildFlushingBatch() {
t.Fatal("expected retry batch to reuse the existing partition mute")
}
require.Equal(t, retryPartitionSet.msgs[0], bp.flushingBatch.msgs["topic"][0].msgs[0])
require.False(t, retryPartitionSet.msgs[0].reusePartitionMute)
}

type stubLeaderClient struct {
cfg *Config
leader *Broker
Expand Down
36 changes: 36 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,42 @@ type partitionSet struct {
bufferBytes int
}

// markPartitionMuteReusable lets the first message carry the retry batch's
// right to reuse the partition mute that is still held by its failed send.
// Only the first message is marked because one held mute represents one
// in-flight retry permission; marking every message could let split retry
// fragments bypass each other under the same reservation.
func (ps *partitionSet) markPartitionMuteReusable() {
if len(ps.msgs) > 0 {
ps.msgs[0].reusePartitionMute = true
}
}

// canReusePartitionMute reports whether this retry batch may be flushed while
// its partition is already muted by the previous send attempt.
func (ps *partitionSet) canReusePartitionMute() bool {
return len(ps.msgs) > 0 && ps.msgs[0].reusePartitionMute
}

// clearPartitionMuteReuse consumes the one-shot reuse marker once the retry
// batch is selected for flushing.
func (ps *partitionSet) clearPartitionMuteReuse() {
for _, msg := range ps.msgs {
msg.reusePartitionMute = false
}
}

// canRetry prevents holding a partition mute when every message in the set will
// be returned as an error instead of being retried.
func (ps *partitionSet) canRetry(maxRetries int) bool {
for _, msg := range ps.msgs {
if msg.retries < maxRetries {
return true
}
}
return false
}

type produceSet struct {
parent *asyncProducer
msgs map[string]map[int32]*partitionSet
Expand Down
Loading