From 8173b251810c3fe2db2ec1f739a6f197cccfad31 Mon Sep 17 00:00:00 2001 From: Leon Hwang Date: Thu, 20 Nov 2025 15:38:10 +0800 Subject: [PATCH 1/8] fix: close broken tcp connections Prevent TCP connection leaks that error occurred to kernel space's socket. Previously, when errors occurred on TCP sockets, connections would remain in TCP_CLOSE state indefinitely without being properly freed. The leaked connections caused a cascade of kernel-level issues: - TCP sockets remain in TCP_CLOSE state indefinitely - Their sk_receive_queues retain FINACK skb packets - These skbs hold pages allocated from page_pool - page_pool_release_retry() stalls because pages cannot be freed while still referenced by the TCP stack The fix ensures that TCP connections are properly closed when socket errors occur, preventing resource leaks at both the application and kernel levels. Signed-off-by: Leon Hwang --- broker.go | 21 +++++++++++++++++++++ client.go | 40 ++++++++++++++++++++++++++++++++++++++++ go.mod | 11 +++++++++++ go.sum | 17 +++++++++++++++++ sockopt_other.go | 9 +++++++++ sockopt_posix.go | 36 ++++++++++++++++++++++++++++++++++++ 6 files changed, 134 insertions(+) create mode 100644 sockopt_other.go create mode 100644 sockopt_posix.go diff --git a/broker.go b/broker.go index 6b23ec72f..d943ad491 100644 --- a/broker.go +++ b/broker.go @@ -157,6 +157,27 @@ func NewBroker(addr string) *Broker { return &Broker{id: -1, addr: addr} } +func (b *Broker) getSockError() error { + b.lock.Lock() + defer b.lock.Unlock() + + if b.conn == nil { + return nil + } + + conn := b.conn + if c, ok := conn.(*bufConn); ok { + conn = c.Conn + } + if c, ok := conn.(*tls.Conn); ok { + conn = c.NetConn() + } + if c, ok := conn.(*net.TCPConn); ok { + return getTCPConnSockError(c) + } + return nil +} + // Open tries to connect to the Broker if it is not already connected or connecting, but does not block // waiting for the connection to complete. This means that any subsequent operations on the broker will // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, diff --git a/client.go b/client.go index 7fddb1a20..fbe58ba01 100644 --- a/client.go +++ b/client.go @@ -725,6 +725,38 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } } +func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker { + if len(brokers) == 0 { + return nil + } + + healthyBrokers := make([]*Broker, 0, len(brokers)) + for _, broker := range brokers { + if err := broker.getSockError(); err != nil { + Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + safeAsyncClose(broker) + continue + } + + healthyBrokers = append(healthyBrokers, broker) + } + + return healthyBrokers +} + +func (client *client) checkBrokersHealth() { + for id, broker := range client.brokers { + if err := broker.getSockError(); err != nil { + Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + safeAsyncClose(broker) + delete(client.brokers, id) + } + } + + client.seedBrokers = client.checkSeedBrokersHealth(client.seedBrokers) + client.deadSeeds = client.checkSeedBrokersHealth(client.deadSeeds) +} + func (client *client) updateBroker(brokers []*Broker) { if client.brokers == nil { return @@ -1109,6 +1141,14 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo client.lock.Lock() defer client.lock.Unlock() + // Check health of existing brokers, including seed brokers, dead + // seed brokers, and registered brokers. + // - if error occurred on broker's tcp socket, close the tcp + // connection. + // - if it's seed broker or dead seed broker, remove it from + // the list. + client.checkBrokersHealth() + // For all the brokers we received: // - if it is a new ID, save it // - if it is an existing ID, but the address we have is stale, discard the old one and save it diff --git a/go.mod b/go.mod index 913c5ad75..5d2847327 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/jcmturner/gofork v1.7.6 github.com/jcmturner/gokrb5/v8 v8.4.4 +<<<<<<< HEAD github.com/klauspost/compress v1.16.7 github.com/pierrec/lz4/v4 v4.1.18 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 @@ -18,6 +19,16 @@ require ( github.com/xdg-go/scram v1.1.2 golang.org/x/net v0.15.0 golang.org/x/sync v0.3.0 +======= + github.com/klauspost/compress v1.18.5 + github.com/pierrec/lz4/v4 v4.1.26 + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 + github.com/stretchr/testify v1.11.1 + go.uber.org/goleak v1.3.0 + golang.org/x/net v0.53.0 + golang.org/x/sync v0.20.0 + golang.org/x/sys v0.43.0 +>>>>>>> 8eafc465 (fix: close broken tcp connections) ) require ( diff --git a/go.sum b/go.sum index b04f44732..316b4189e 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,13 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +<<<<<<< HEAD github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +======= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +>>>>>>> 8eafc465 (fix: close broken tcp connections) github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -70,8 +75,13 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +<<<<<<< HEAD golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +======= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +>>>>>>> 8eafc465 (fix: close broken tcp connections) golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -79,8 +89,13 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +<<<<<<< HEAD golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +======= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +>>>>>>> 8eafc465 (fix: close broken tcp connections) golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= @@ -91,6 +106,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/sockopt_other.go b/sockopt_other.go new file mode 100644 index 000000000..43d940571 --- /dev/null +++ b/sockopt_other.go @@ -0,0 +1,9 @@ +//go:build !unix || android || illumos || ios || hurd + +package sarama + +import "net" + +func getTCPConnSockError(_ *net.TCPConn) error { + return nil +} diff --git a/sockopt_posix.go b/sockopt_posix.go new file mode 100644 index 000000000..9aaf839af --- /dev/null +++ b/sockopt_posix.go @@ -0,0 +1,36 @@ +//go:build unix && !android && !illumos && !ios && !hurd + +// build on aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris + +package sarama + +import ( + "fmt" + "net" + + "golang.org/x/sys/unix" +) + +func getTCPConnSockError(conn *net.TCPConn) error { + rawConn, err := conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get raw connection: %w", err) + } + + var sockErr int + var opErr error + + err = rawConn.Control(func(fd uintptr) { + sockErr, opErr = unix.GetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_ERROR) + }) + if err != nil { + return fmt.Errorf("failed to control raw connection: %w", err) + } + if opErr != nil { + return fmt.Errorf("failed to get socket error: %w", opErr) + } + if sockErr != 0 { + return unix.Errno(sockErr) + } + return nil +} From 24512d52fddc4ee50fff801a7e59f7d30c8870b4 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 23 Mar 2026 10:04:02 +0000 Subject: [PATCH 2/8] fix: keep bootstrap brokers and unblock async shutdown Skip socket health checks while a broker is still opening so metadata refresh does not block behind the broker mutex. Keep seed and dead-seed brokers in their bootstrap pools after socket errors so they can be reopened on a later refresh. Close broker sockets before waiting for the response reader so health-check cleanup and SASL v1 auth failures cannot hang while async responses are still in flight. Signed-off-by: Dominic Evans --- broker.go | 17 +++----- broker_test.go | 46 ++++++++++++++++++++ client.go | 6 +-- client_test.go | 116 +++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 15 deletions(-) diff --git a/broker.go b/broker.go index d943ad491..9358b2a79 100644 --- a/broker.go +++ b/broker.go @@ -158,7 +158,10 @@ func NewBroker(addr string) *Broker { } func (b *Broker) getSockError() error { - b.lock.Lock() + // skip socket health checks while another operation owns broker state + if !b.lock.TryLock() { + return nil + } defer b.lock.Unlock() if b.conn == nil { @@ -308,18 +311,12 @@ func (b *Broker) Open(conf *Config) error { if conf.Net.SASL.Enable && !useSaslV0 { b.connErr = b.authenticateViaSASLv1() if b.connErr != nil { - close(b.responses) - <-b.done - b.responses = nil - b.done = nil - err = b.conn.Close() + err = b.closeLocked() if err == nil { DebugLogger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } - b.conn = nil - atomic.StoreInt32(&b.opened, 0) return } } @@ -398,12 +395,12 @@ func (b *Broker) closeLocked() error { if b.responses != nil { close(b.responses) } + // close the socket before waiting so in-flight reads can exit + err := b.conn.Close() if b.done != nil { <-b.done } - err := b.conn.Close() - b.conn = nil b.responses = nil b.done = nil diff --git a/broker_test.go b/broker_test.go index b7c6c4b7a..9f2962028 100644 --- a/broker_test.go +++ b/broker_test.go @@ -198,6 +198,52 @@ func TestBrokerFailedRequest(t *testing.T) { } } +func TestBrokerClose(t *testing.T) { + t.Run("interrupts active async response reads", func(t *testing.T) { + mockBroker := NewMockBroker(t, 0) + defer mockBroker.Close() + + broker := NewBroker(mockBroker.Addr()) + conf := NewTestConfig() + conf.ApiVersionsRequest = false + conf.Net.ReadTimeout = 30 * time.Second + + require.NoError(t, broker.Open(conf)) + + request := ProduceRequest{} + request.RequiredAcks = WaitForLocal + + responseErrs := make(chan error, 1) + err := broker.AsyncProduce(&request, func(_ *ProduceResponse, err error) { + responseErrs <- err + }) + require.NoError(t, err) + + closeErrs := make(chan error, 1) + go func() { + closeErrs <- broker.Close() + }() + + select { + case err := <-closeErrs: + require.NoError(t, err) + case <-time.After(250 * time.Millisecond): + require.FailNow(t, "Close blocked with an active async response read") + } + + select { + case err := <-responseErrs: + require.Error(t, err) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for the async produce callback") + } + + connected, err := broker.Connected() + require.NoError(t, err) + require.False(t, connected) + }) +} + // closeImmediatelyDialer is a test dialer that returns a net.Conn whose peer is // already closed. This reliably triggers a transport-level failure (e.g. EOF) // during ApiVersions negotiation in Broker.Open. diff --git a/client.go b/client.go index fbe58ba01..0e355710d 100644 --- a/client.go +++ b/client.go @@ -730,18 +730,14 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker { return nil } - healthyBrokers := make([]*Broker, 0, len(brokers)) for _, broker := range brokers { if err := broker.getSockError(); err != nil { Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) safeAsyncClose(broker) - continue } - - healthyBrokers = append(healthyBrokers, broker) } - return healthyBrokers + return brokers } func (client *client) checkBrokersHealth() { diff --git a/client_test.go b/client_test.go index 2fd819625..5522d23a6 100644 --- a/client_test.go +++ b/client_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "net" "sync" "sync/atomic" "syscall" @@ -746,6 +747,121 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, c) } +func TestClientCheckBrokersHealth(t *testing.T) { + newConnectedBroker := func(t *testing.T) (*Broker, *net.TCPConn, func()) { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + accepted := make(chan *net.TCPConn, 1) + acceptErr := make(chan error, 1) + go func() { + conn, err := listener.Accept() + if err != nil { + acceptErr <- err + return + } + accepted <- conn.(*net.TCPConn) + }() + + conn, err := net.Dial("tcp", listener.Addr().String()) + require.NoError(t, err) + + var serverConn *net.TCPConn + select { + case serverConn = <-accepted: + case err := <-acceptErr: + require.NoError(t, err) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for test broker connection") + } + + broker := NewBroker(listener.Addr().String()) + broker.conn = conn.(*net.TCPConn) + broker.metricRegistry = metrics.NewRegistry() + broker.opened.Store(true) + + cleanup := func() { + _ = listener.Close() + _ = serverConn.Close() + _ = broker.Close() + } + + return broker, serverConn, cleanup + } + + t.Run("does not wait for brokers that are opening", func(t *testing.T) { + broker := &Broker{id: 1, addr: "127.0.0.1:9092"} + broker.lock.Lock() + defer broker.lock.Unlock() + + client := &client{ + brokers: map[int32]*Broker{broker.ID(): broker}, + } + + done := make(chan struct{}) + go func() { + client.checkBrokersHealth() + close(done) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.FailNow(t, "checkBrokersHealth blocked on a broker that was still opening") + } + }) + + t.Run("keeps unhealthy live seed brokers available for reopening", func(t *testing.T) { + broker, serverConn, cleanup := newConnectedBroker(t) + defer cleanup() + + client := &client{ + brokers: map[int32]*Broker{}, + seedBrokers: []*Broker{broker}, + } + + require.NoError(t, serverConn.SetLinger(0)) + require.NoError(t, serverConn.Close()) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + client.checkBrokersHealth() + + assert.Len(c, client.seedBrokers, 1) + assert.Same(c, broker, client.seedBrokers[0]) + + connected, err := broker.Connected() + assert.NoError(c, err) + assert.False(c, connected) + }, time.Second, 10*time.Millisecond) + }) + + t.Run("keeps unhealthy dead seed brokers available for reopening", func(t *testing.T) { + broker, serverConn, cleanup := newConnectedBroker(t) + defer cleanup() + + client := &client{ + brokers: map[int32]*Broker{}, + deadSeeds: []*Broker{broker}, + } + + require.NoError(t, serverConn.SetLinger(0)) + require.NoError(t, serverConn.Close()) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + client.checkBrokersHealth() + + assert.Len(c, client.deadSeeds, 1) + assert.Same(c, broker, client.deadSeeds[0]) + + connected, err := broker.Connected() + assert.NoError(c, err) + assert.False(c, connected) + }, time.Second, 10*time.Millisecond) + }) +} + //nolint:paralleltest func TestClientController(t *testing.T) { seedBroker := NewMockBroker(t, 1) From 1b4f32e734dfccec972199e4a63ebd5659a25186 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 26 Mar 2026 17:42:42 +0000 Subject: [PATCH 3/8] fix: skip broker health tests without socket probing Signed-off-by: Dominic Evans --- client_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/client_test.go b/client_test.go index 5522d23a6..28e9b03cd 100644 --- a/client_test.go +++ b/client_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net" + "runtime" "sync" "sync/atomic" "syscall" @@ -18,6 +19,15 @@ import ( "github.com/stretchr/testify/require" ) +func socketErrorProbeAvailable() bool { + switch runtime.GOOS { + case "aix", "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd", "solaris": + return true + default: + return false + } +} + func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) @@ -814,6 +824,10 @@ func TestClientCheckBrokersHealth(t *testing.T) { }) t.Run("keeps unhealthy live seed brokers available for reopening", func(t *testing.T) { + if !socketErrorProbeAvailable() { + t.Skip("socket error probing is unavailable on this platform") + } + broker, serverConn, cleanup := newConnectedBroker(t) defer cleanup() @@ -838,6 +852,10 @@ func TestClientCheckBrokersHealth(t *testing.T) { }) t.Run("keeps unhealthy dead seed brokers available for reopening", func(t *testing.T) { + if !socketErrorProbeAvailable() { + t.Skip("socket error probing is unavailable on this platform") + } + broker, serverConn, cleanup := newConnectedBroker(t) defer cleanup() From 2a7878d0932ccefa15713031112d0bd3989668d4 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 23 Apr 2026 12:49:37 +0100 Subject: [PATCH 4/8] fix: remove return from checkSeedBrokersHealth Signed-off-by: Dominic Evans --- client.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 0e355710d..5af73dbc8 100644 --- a/client.go +++ b/client.go @@ -725,9 +725,9 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } } -func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker { +func (client *client) checkSeedBrokersHealth(brokers []*Broker) { if len(brokers) == 0 { - return nil + return } for _, broker := range brokers { @@ -736,8 +736,6 @@ func (client *client) checkSeedBrokersHealth(brokers []*Broker) []*Broker { safeAsyncClose(broker) } } - - return brokers } func (client *client) checkBrokersHealth() { @@ -749,8 +747,8 @@ func (client *client) checkBrokersHealth() { } } - client.seedBrokers = client.checkSeedBrokersHealth(client.seedBrokers) - client.deadSeeds = client.checkSeedBrokersHealth(client.deadSeeds) + client.checkSeedBrokersHealth(client.seedBrokers) + client.checkSeedBrokersHealth(client.deadSeeds) } func (client *client) updateBroker(brokers []*Broker) { From 9cf476fe9ee985a622bdd7199219ccf6056ff042 Mon Sep 17 00:00:00 2001 From: Lingnan Liu Date: Thu, 23 Apr 2026 16:44:48 -0700 Subject: [PATCH 5/8] fix: add Unwrap() to DescribeConfigError and AlterConfigError (#3487) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary `TopicError` and `TopicPartitionError` implement `Unwrap()` returning their `KError` field, enabling `errors.Is`/`errors.As` to traverse the error chain. However, `DescribeConfigError` and `AlterConfigError` have the same structure (`Err KError` + `ErrMsg`) but were missing `Unwrap()`, making `errors.Is`/`errors.As` unable to match the underlying `KError`. This adds the missing `Unwrap()` methods for consistency, allowing callers to use `errors.Is(err, sarama.ErrInvalidConfig)` instead of type-asserting to extract the error code. ## Changes - Add `Unwrap() error` to `*DescribeConfigError` (`describe_configs_response.go`) - Add `Unwrap() error` to `*AlterConfigError` (`alter_configs_response.go`) - Add tests for both, following the existing `TestTopicError` pattern ## Test plan - [x] `go test -run 'TestDescribeConfigError|TestAlterConfigError|TestTopicError'` — all pass - [x] `go vet ./...` — clean Signed-off-by: Lingnan Liu --- alter_configs_response.go | 4 ++++ alter_configs_response_test.go | 32 +++++++++++++++++++++++++++++++ describe_configs_response.go | 4 ++++ describe_configs_response_test.go | 32 +++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+) diff --git a/alter_configs_response.go b/alter_configs_response.go index 4c458ddad..e6e3ec712 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -29,6 +29,10 @@ func (c *AlterConfigError) Error() string { return text } +func (c *AlterConfigError) Unwrap() error { + return c.Err +} + // AlterConfigsResourceResponse is a response type for alter config resource type AlterConfigsResourceResponse struct { ErrorCode int16 diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index 4707d075d..b4c6df542 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -3,6 +3,7 @@ package sarama import ( + "errors" "testing" ) @@ -45,3 +46,34 @@ func TestAlterConfigsResponse(t *testing.T) { } testResponse(t, "response with error", response, alterResponsePopulated) } + +func TestAlterConfigError(t *testing.T) { + // Assert that AlterConfigError satisfies error interface + var err error = &AlterConfigError{ + Err: ErrInvalidConfig, + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig") + } + + got := err.Error() + want := ErrInvalidConfig.Error() + if got != want { + t.Errorf("AlterConfigError.Error() = %v; want %v", got, want) + } + + err = &AlterConfigError{ + Err: ErrInvalidConfig, + ErrMsg: "invalid config value", + } + got = err.Error() + want = ErrInvalidConfig.Error() + " - invalid config value" + if got != want { + t.Errorf("AlterConfigError.Error() = %v; want %v", got, want) + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig with ErrMsg set") + } +} diff --git a/describe_configs_response.go b/describe_configs_response.go index 29ca988b7..59c4a3d30 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -47,6 +47,10 @@ func (c *DescribeConfigError) Error() string { return text } +func (c *DescribeConfigError) Unwrap() error { + return c.Err +} + type DescribeConfigsResponse struct { Version int16 ThrottleTime time.Duration diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index 60c4a0810..0ee883687 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -3,6 +3,7 @@ package sarama import ( + "errors" "testing" ) @@ -275,3 +276,34 @@ func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) { } testResponse(t, "response with error", response, describeConfigsResponseWithDefaultv1) } + +func TestDescribeConfigError(t *testing.T) { + // Assert that DescribeConfigError satisfies error interface + var err error = &DescribeConfigError{ + Err: ErrInvalidConfig, + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig") + } + + got := err.Error() + want := ErrInvalidConfig.Error() + if got != want { + t.Errorf("DescribeConfigError.Error() = %v; want %v", got, want) + } + + err = &DescribeConfigError{ + Err: ErrInvalidConfig, + ErrMsg: "invalid config value", + } + got = err.Error() + want = ErrInvalidConfig.Error() + " - invalid config value" + if got != want { + t.Errorf("DescribeConfigError.Error() = %v; want %v", got, want) + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig with ErrMsg set") + } +} From 08b83089cc0681767b1efd2fcde654ee33f6d8c8 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 11:08:08 +0800 Subject: [PATCH 6/8] fix conflicts --- go.mod | 10 ---------- go.sum | 15 --------------- 2 files changed, 25 deletions(-) diff --git a/go.mod b/go.mod index 5d2847327..a8a917ed1 100644 --- a/go.mod +++ b/go.mod @@ -11,15 +11,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/jcmturner/gofork v1.7.6 github.com/jcmturner/gokrb5/v8 v8.4.4 -<<<<<<< HEAD - github.com/klauspost/compress v1.16.7 - github.com/pierrec/lz4/v4 v4.1.18 - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 - github.com/stretchr/testify v1.8.4 - github.com/xdg-go/scram v1.1.2 - golang.org/x/net v0.15.0 - golang.org/x/sync v0.3.0 -======= github.com/klauspost/compress v1.18.5 github.com/pierrec/lz4/v4 v4.1.26 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 @@ -28,7 +19,6 @@ require ( golang.org/x/net v0.53.0 golang.org/x/sync v0.20.0 golang.org/x/sys v0.43.0 ->>>>>>> 8eafc465 (fix: close broken tcp connections) ) require ( diff --git a/go.sum b/go.sum index 316b4189e..979eddf47 100644 --- a/go.sum +++ b/go.sum @@ -33,13 +33,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -<<<<<<< HEAD -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -======= github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= ->>>>>>> 8eafc465 (fix: close broken tcp connections) github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -75,13 +70,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -<<<<<<< HEAD -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= -======= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= ->>>>>>> 8eafc465 (fix: close broken tcp connections) golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -89,13 +79,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -<<<<<<< HEAD -golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= -golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -======= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= ->>>>>>> 8eafc465 (fix: close broken tcp connections) golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= From 729c2be01920d4d8c20e399be39f464ef2f60be3 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 7 May 2026 11:11:06 +0800 Subject: [PATCH 7/8] fix conflicts --- go.mod | 6 +----- go.sum | 25 ++++++++----------------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index a8a917ed1..103c427fc 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/pierrec/lz4/v4 v4.1.26 github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 github.com/stretchr/testify v1.11.1 - go.uber.org/goleak v1.3.0 golang.org/x/net v0.53.0 golang.org/x/sync v0.20.0 golang.org/x/sys v0.43.0 @@ -30,10 +29,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/stringprep v1.0.4 // indirect - golang.org/x/crypto v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/crypto v0.50.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 979eddf47..78e450c04 100644 --- a/go.sum +++ b/go.sum @@ -43,12 +43,12 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -58,14 +58,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= -github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= -github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -83,8 +77,8 @@ golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -99,10 +93,7 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= From a1d25d76e7c32f9d6c0efcc1821dfbc79942d63a Mon Sep 17 00:00:00 2001 From: Sahil Sojitra <88416181+Sahil-4555@users.noreply.github.com> Date: Tue, 2 Sep 2025 21:32:27 +0530 Subject: [PATCH 8/8] chore: refactor to use modern atomic types (#3277) --- broker.go | 11 +++++------ client.go | 6 +++--- consumer.go | 22 +++++++++++----------- mocks/consumer.go | 31 ++++++++++++++++--------------- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/broker.go b/broker.go index 9358b2a79..ad74ae7da 100644 --- a/broker.go +++ b/broker.go @@ -30,7 +30,7 @@ type Broker struct { conn net.Conn connErr error lock sync.Mutex - opened int32 + opened atomic.Bool responses chan *responsePromise done chan bool @@ -187,7 +187,7 @@ func (b *Broker) getSockError() error { // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or // AlreadyConnected. If conf is nil, the result of NewConfig() is used. func (b *Broker) Open(conf *Config) error { - if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { + if !b.opened.CompareAndSwap(false, true) { return ErrAlreadyConnected } @@ -214,7 +214,7 @@ func (b *Broker) Open(conf *Config) error { if b.connErr != nil { Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } if conf.Net.TLS.Enable { @@ -299,7 +299,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } } @@ -412,8 +412,7 @@ func (b *Broker) closeLocked() error { } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } - - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return err } diff --git a/client.go b/client.go index 5af73dbc8..6641743ac 100644 --- a/client.go +++ b/client.go @@ -140,7 +140,7 @@ type client struct { // updateMetadataMs stores the time at which metadata was lasted updated. // Note: this accessed atomically so must be the first word in the struct // as per golang/go#41970 - updateMetadataMs int64 + updateMetadataMs atomic.Int64 conf *Config closer, closed chan none // for shutting down background metadata updater @@ -1042,7 +1042,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, time.Sleep(backoff) } - t := atomic.LoadInt64(&client.updateMetadataMs) + t := client.updateMetadataMs.Load() if time.Since(time.UnixMilli(t)) < backoff { return err } @@ -1067,7 +1067,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, req := NewMetadataRequest(client.conf.Version, topics) req.AllowAutoTopicCreation = allowAutoTopicCreation - atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli()) + client.updateMetadataMs.Store(time.Now().UnixMilli()) response, err := broker.GetMetadata(req) var kerror KError diff --git a/consumer.go b/consumer.go index 60556a566..a56adf213 100644 --- a/consumer.go +++ b/consumer.go @@ -392,7 +392,7 @@ type PartitionConsumer interface { } type partitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG consumer *consumer conf *Config @@ -411,9 +411,9 @@ type partitionConsumer struct { responseResult error fetchSize int32 offset int64 - retries int32 + retries atomic.Int32 - paused int32 + paused atomic.Bool // accessed atomically, 0 = not paused, 1 = paused } var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing @@ -434,7 +434,7 @@ func (child *partitionConsumer) sendError(err error) { func (child *partitionConsumer) computeBackoff() time.Duration { if child.conf.Consumer.Retry.BackoffFunc != nil { - retries := atomic.AddInt32(&child.retries, 1) + retries := child.retries.Add(1) return child.conf.Consumer.Retry.BackoffFunc(int(retries)) } return child.conf.Consumer.Retry.Backoff @@ -508,7 +508,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error { return err } - child.highWaterMarkOffset = newestOffset + child.highWaterMarkOffset.Store(newestOffset) oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) if err != nil { @@ -562,7 +562,7 @@ func (child *partitionConsumer) Close() error { } func (child *partitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&child.highWaterMarkOffset) + return child.highWaterMarkOffset.Load() } func (child *partitionConsumer) responseFeeder() { @@ -575,7 +575,7 @@ feederLoop: msgs, child.responseResult = child.parseResponse(response) if child.responseResult == nil { - atomic.StoreInt32(&child.retries, 0) + child.retries.Store(0) } for i, msg := range msgs { @@ -751,7 +751,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu // we got messages, reset our fetch size in case it was increased for a previous request child.fetchSize = child.conf.Consumer.Fetch.Default - atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) + child.highWaterMarkOffset.Store(block.HighWaterMarkOffset) // abortedProducerIDs contains producerID which message should be ignored as uncommitted // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) @@ -837,17 +837,17 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) { // Pause implements PartitionConsumer. func (child *partitionConsumer) Pause() { - atomic.StoreInt32(&child.paused, 1) + child.paused.Store(true) } // Resume implements PartitionConsumer. func (child *partitionConsumer) Resume() { - atomic.StoreInt32(&child.paused, 0) + child.paused.Store(false) } // IsPaused implements PartitionConsumer. func (child *partitionConsumer) IsPaused() bool { - return atomic.LoadInt32(&child.paused) == 1 + return child.paused.Load() } type brokerConsumer struct { diff --git a/mocks/consumer.go b/mocks/consumer.go index ecd0bbcb6..0f6f1ab2a 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -222,16 +222,17 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset highWatermarkOffset = 0 } - c.partitionConsumers[topic][partition] = &PartitionConsumer{ - highWaterMarkOffset: highWatermarkOffset, - t: c.t, - topic: topic, - partition: partition, - offset: offset, - messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), + consumer := &PartitionConsumer{ + t: c.t, + topic: topic, + partition: partition, + offset: offset, + messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), } + consumer.highWaterMarkOffset.Store(highWatermarkOffset) + c.partitionConsumers[topic][partition] = consumer } return c.partitionConsumers[topic][partition] @@ -247,7 +248,8 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset // Errors and Messages channel, you should specify what values will be provided on these // channels using YieldMessage and YieldError. type PartitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + suppressedHighWaterMarkOffset int64 l sync.Mutex t ErrorReporter topic string @@ -255,7 +257,6 @@ type PartitionConsumer struct { offset int64 messages chan *sarama.ConsumerMessage suppressedMessages chan *sarama.ConsumerMessage - suppressedHighWaterMarkOffset int64 errors chan *sarama.ConsumerError singleClose sync.Once consumed bool @@ -345,7 +346,7 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { } func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&pc.highWaterMarkOffset) + return pc.highWaterMarkOffset.Load() } // Pause implements the Pause method from the sarama.PartitionConsumer interface. @@ -353,7 +354,7 @@ func (pc *PartitionConsumer) Pause() { pc.l.Lock() defer pc.l.Unlock() - pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset) + pc.suppressedHighWaterMarkOffset = pc.highWaterMarkOffset.Load() pc.paused = true } @@ -363,7 +364,7 @@ func (pc *PartitionConsumer) Resume() { pc.l.Lock() defer pc.l.Unlock() - pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset) + pc.highWaterMarkOffset.Store(pc.suppressedHighWaterMarkOffset) for len(pc.suppressedMessages) > 0 { msg := <-pc.suppressedMessages pc.messages <- msg @@ -400,7 +401,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1 pc.suppressedMessages <- msg } else { - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 + msg.Offset = pc.highWaterMarkOffset.Add(1) - 1 pc.messages <- msg }