From 977c12be1e96671071073b1ba920db7908130edd Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 6 May 2026 18:25:13 +0800 Subject: [PATCH] remove verbose log --- async_producer.go | 17 ----------------- client.go | 3 --- 2 files changed, 20 deletions(-) diff --git a/async_producer.go b/async_producer.go index bc406993e..a9af20b1b 100644 --- a/async_producer.go +++ b/async_producer.go @@ -800,7 +800,6 @@ func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMess pp.backoff(msg.retries) return err } - Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } return nil } @@ -826,7 +825,6 @@ func (pp *partitionProducer) dispatch() { select { case <-pp.brokerProducer.abandoned: // a message on the abandoned channel means that our current broker selection is out of date - Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) pp.brokerProducer = nil time.Sleep(pp.parent.conf.Producer.Retry.Backoff) @@ -887,7 +885,6 @@ func (pp *partitionProducer) dispatch() { } func (pp *partitionProducer) newHighWatermark(hwm int) { - Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm) pp.highWatermark = hwm // send off a fin so that we know when everything "in between" has made it @@ -897,13 +894,11 @@ func (pp *partitionProducer) newHighWatermark(hwm int) { pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} // a new HWM means that our current broker selection is out of date - Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) pp.brokerProducer = nil } func (pp *partitionProducer) flushRetryBuffers() { - Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) for { pp.highWatermark-- @@ -912,7 +907,6 @@ func (pp *partitionProducer) flushRetryBuffers() { pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) goto flushDone } - Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } for _, msg := range pp.retryState[pp.highWatermark].buf { @@ -922,10 +916,8 @@ func (pp *partitionProducer) flushRetryBuffers() { flushDone: pp.retryState[pp.highWatermark].buf = nil if pp.retryState[pp.highWatermark].expectChaser { - Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark) break } else if pp.highWatermark == 0 { - Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition) break } } @@ -1089,7 +1081,6 @@ type brokerProducer struct { func (bp *brokerProducer) run() { var output chan<- *produceSet - Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { if bp.flushingBatch == nil && (bp.timerFired || bp.accumulatingBatch.readyToFlush()) { @@ -1110,7 +1101,6 @@ func (bp *brokerProducer) run() { select { case msg, ok := <-bp.input: if !ok { - Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID()) bp.shutdown() return } @@ -1120,8 +1110,6 @@ func (bp *brokerProducer) run() { } if msg.flags&syn == syn { - Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n", - bp.broker.ID(), msg.Topic, msg.Partition) if bp.currentRetries[msg.Topic] == nil { bp.currentRetries[msg.Topic] = make(map[int32]error) } @@ -1136,8 +1124,6 @@ func (bp *brokerProducer) run() { if bp.closing == nil && msg.flags&fin == fin { // we were retrying this partition but we can start processing again delete(bp.currentRetries[msg.Topic], msg.Partition) - Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n", - bp.broker.ID(), msg.Topic, msg.Partition) } continue @@ -1247,8 +1233,6 @@ func (bp *brokerProducer) shutdown() { for response := range bp.responses { bp.handleResponse(response) } - // No more brokerProducer related goroutine should be running - Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { @@ -1599,7 +1583,6 @@ func (p *asyncProducer) retryHandler() { // utility functions func (p *asyncProducer) shutdown() { - Logger.Println("Producer shutting down.") p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} diff --git a/client.go b/client.go index 7009613c1..7fddb1a20 100644 --- a/client.go +++ b/client.go @@ -307,9 +307,6 @@ func (client *client) InitProducerID() (*InitProducerIDResponse, error) { func (client *client) Close() error { if client.Closed() { - // Chances are this is being called from a defer() and the error will go unobserved - // so we go ahead and log the event in this case. - Logger.Printf("Close() called on already closed client") return ErrClosedClient }