Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,6 @@ func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMess
pp.backoff(msg.retries)
return err
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
return nil
}
Expand All @@ -826,7 +825,6 @@ func (pp *partitionProducer) dispatch() {
select {
case <-pp.brokerProducer.abandoned:
// a message on the abandoned channel means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
Expand Down Expand Up @@ -887,7 +885,6 @@ func (pp *partitionProducer) dispatch() {
}

func (pp *partitionProducer) newHighWatermark(hwm int) {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
pp.highWatermark = hwm

// send off a fin so that we know when everything "in between" has made it
Expand All @@ -897,13 +894,11 @@ func (pp *partitionProducer) newHighWatermark(hwm int) {
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}

// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
}

func (pp *partitionProducer) flushRetryBuffers() {
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
for {
pp.highWatermark--

Expand All @@ -912,7 +907,6 @@ func (pp *partitionProducer) flushRetryBuffers() {
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
goto flushDone
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}

for _, msg := range pp.retryState[pp.highWatermark].buf {
Expand All @@ -922,10 +916,8 @@ func (pp *partitionProducer) flushRetryBuffers() {
flushDone:
pp.retryState[pp.highWatermark].buf = nil
if pp.retryState[pp.highWatermark].expectChaser {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
break
} else if pp.highWatermark == 0 {
Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
break
}
}
Expand Down Expand Up @@ -1089,7 +1081,6 @@ type brokerProducer struct {

func (bp *brokerProducer) run() {
var output chan<- *produceSet
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())

for {
if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) {
Expand All @@ -1110,7 +1101,6 @@ func (bp *brokerProducer) run() {
select {
case msg, ok := <-bp.input:
if !ok {
Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
bp.shutdown()
return
}
Expand All @@ -1120,8 +1110,6 @@ func (bp *brokerProducer) run() {
}

if msg.flags&syn == syn {
Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
if bp.currentRetries[msg.Topic] == nil {
bp.currentRetries[msg.Topic] = make(map[int32]error)
}
Expand All @@ -1136,8 +1124,6 @@ func (bp *brokerProducer) run() {
if bp.closing == nil && msg.flags&fin == fin {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
}

continue
Expand Down Expand Up @@ -1247,8 +1233,6 @@ func (bp *brokerProducer) shutdown() {
for response := range bp.responses {
bp.handleResponse(response)
}
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
Expand Down Expand Up @@ -1599,7 +1583,6 @@ func (p *asyncProducer) retryHandler() {
// utility functions

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}

Expand Down
3 changes: 0 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ func (client *client) InitProducerID() (*InitProducerIDResponse, error) {

func (client *client) Close() error {
if client.Closed() {
// Chances are this is being called from a defer() and the error will go unobserved
// so we go ahead and log the event in this case.
Logger.Printf("Close() called on already closed client")
return ErrClosedClient
}

Expand Down
Loading