diff --git a/alter_configs_response.go b/alter_configs_response.go index 4c458ddad..e6e3ec712 100644 --- a/alter_configs_response.go +++ b/alter_configs_response.go @@ -29,6 +29,10 @@ func (c *AlterConfigError) Error() string { return text } +func (c *AlterConfigError) Unwrap() error { + return c.Err +} + // AlterConfigsResourceResponse is a response type for alter config resource type AlterConfigsResourceResponse struct { ErrorCode int16 diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index 4707d075d..b4c6df542 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -3,6 +3,7 @@ package sarama import ( + "errors" "testing" ) @@ -45,3 +46,34 @@ func TestAlterConfigsResponse(t *testing.T) { } testResponse(t, "response with error", response, alterResponsePopulated) } + +func TestAlterConfigError(t *testing.T) { + // Assert that AlterConfigError satisfies error interface + var err error = &AlterConfigError{ + Err: ErrInvalidConfig, + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig") + } + + got := err.Error() + want := ErrInvalidConfig.Error() + if got != want { + t.Errorf("AlterConfigError.Error() = %v; want %v", got, want) + } + + err = &AlterConfigError{ + Err: ErrInvalidConfig, + ErrMsg: "invalid config value", + } + got = err.Error() + want = ErrInvalidConfig.Error() + " - invalid config value" + if got != want { + t.Errorf("AlterConfigError.Error() = %v; want %v", got, want) + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig with ErrMsg set") + } +} diff --git a/broker.go b/broker.go index 6b23ec72f..ad74ae7da 100644 --- a/broker.go +++ b/broker.go @@ -30,7 +30,7 @@ type Broker struct { conn net.Conn connErr error lock sync.Mutex - opened int32 + opened atomic.Bool responses chan *responsePromise done chan bool @@ -157,13 +157,37 @@ func NewBroker(addr string) *Broker { return &Broker{id: -1, addr: addr} } +func (b *Broker) getSockError() error { + // skip socket health checks while another operation owns broker state + if !b.lock.TryLock() { + return nil + } + defer b.lock.Unlock() + + if b.conn == nil { + return nil + } + + conn := b.conn + if c, ok := conn.(*bufConn); ok { + conn = c.Conn + } + if c, ok := conn.(*tls.Conn); ok { + conn = c.NetConn() + } + if c, ok := conn.(*net.TCPConn); ok { + return getTCPConnSockError(c) + } + return nil +} + // Open tries to connect to the Broker if it is not already connected or connecting, but does not block // waiting for the connection to complete. This means that any subsequent operations on the broker will // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or // AlreadyConnected. If conf is nil, the result of NewConfig() is used. func (b *Broker) Open(conf *Config) error { - if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) { + if !b.opened.CompareAndSwap(false, true) { return ErrAlreadyConnected } @@ -190,7 +214,7 @@ func (b *Broker) Open(conf *Config) error { if b.connErr != nil { Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } if conf.Net.TLS.Enable { @@ -275,7 +299,7 @@ func (b *Broker) Open(conf *Config) error { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } b.conn = nil - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return } } @@ -287,18 +311,12 @@ func (b *Broker) Open(conf *Config) error { if conf.Net.SASL.Enable && !useSaslV0 { b.connErr = b.authenticateViaSASLv1() if b.connErr != nil { - close(b.responses) - <-b.done - b.responses = nil - b.done = nil - err = b.conn.Close() + err = b.closeLocked() if err == nil { DebugLogger.Printf("Closed connection to broker %s\n", b.addr) } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } - b.conn = nil - atomic.StoreInt32(&b.opened, 0) return } } @@ -377,12 +395,12 @@ func (b *Broker) closeLocked() error { if b.responses != nil { close(b.responses) } + // close the socket before waiting so in-flight reads can exit + err := b.conn.Close() if b.done != nil { <-b.done } - err := b.conn.Close() - b.conn = nil b.responses = nil b.done = nil @@ -394,8 +412,7 @@ func (b *Broker) closeLocked() error { } else { Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) } - - atomic.StoreInt32(&b.opened, 0) + b.opened.Store(false) return err } diff --git a/broker_test.go b/broker_test.go index b7c6c4b7a..9f2962028 100644 --- a/broker_test.go +++ b/broker_test.go @@ -198,6 +198,52 @@ func TestBrokerFailedRequest(t *testing.T) { } } +func TestBrokerClose(t *testing.T) { + t.Run("interrupts active async response reads", func(t *testing.T) { + mockBroker := NewMockBroker(t, 0) + defer mockBroker.Close() + + broker := NewBroker(mockBroker.Addr()) + conf := NewTestConfig() + conf.ApiVersionsRequest = false + conf.Net.ReadTimeout = 30 * time.Second + + require.NoError(t, broker.Open(conf)) + + request := ProduceRequest{} + request.RequiredAcks = WaitForLocal + + responseErrs := make(chan error, 1) + err := broker.AsyncProduce(&request, func(_ *ProduceResponse, err error) { + responseErrs <- err + }) + require.NoError(t, err) + + closeErrs := make(chan error, 1) + go func() { + closeErrs <- broker.Close() + }() + + select { + case err := <-closeErrs: + require.NoError(t, err) + case <-time.After(250 * time.Millisecond): + require.FailNow(t, "Close blocked with an active async response read") + } + + select { + case err := <-responseErrs: + require.Error(t, err) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for the async produce callback") + } + + connected, err := broker.Connected() + require.NoError(t, err) + require.False(t, connected) + }) +} + // closeImmediatelyDialer is a test dialer that returns a net.Conn whose peer is // already closed. This reliably triggers a transport-level failure (e.g. EOF) // during ApiVersions negotiation in Broker.Open. diff --git a/client.go b/client.go index 7fddb1a20..6641743ac 100644 --- a/client.go +++ b/client.go @@ -140,7 +140,7 @@ type client struct { // updateMetadataMs stores the time at which metadata was lasted updated. // Note: this accessed atomically so must be the first word in the struct // as per golang/go#41970 - updateMetadataMs int64 + updateMetadataMs atomic.Int64 conf *Config closer, closed chan none // for shutting down background metadata updater @@ -725,6 +725,32 @@ func (client *client) randomizeSeedBrokers(addrs []string) { } } +func (client *client) checkSeedBrokersHealth(brokers []*Broker) { + if len(brokers) == 0 { + return + } + + for _, broker := range brokers { + if err := broker.getSockError(); err != nil { + Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + safeAsyncClose(broker) + } + } +} + +func (client *client) checkBrokersHealth() { + for id, broker := range client.brokers { + if err := broker.getSockError(); err != nil { + Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err) + safeAsyncClose(broker) + delete(client.brokers, id) + } + } + + client.checkSeedBrokersHealth(client.seedBrokers) + client.checkSeedBrokersHealth(client.deadSeeds) +} + func (client *client) updateBroker(brokers []*Broker) { if client.brokers == nil { return @@ -1016,7 +1042,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, time.Sleep(backoff) } - t := atomic.LoadInt64(&client.updateMetadataMs) + t := client.updateMetadataMs.Load() if time.Since(time.UnixMilli(t)) < backoff { return err } @@ -1041,7 +1067,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, req := NewMetadataRequest(client.conf.Version, topics) req.AllowAutoTopicCreation = allowAutoTopicCreation - atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli()) + client.updateMetadataMs.Store(time.Now().UnixMilli()) response, err := broker.GetMetadata(req) var kerror KError @@ -1109,6 +1135,14 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo client.lock.Lock() defer client.lock.Unlock() + // Check health of existing brokers, including seed brokers, dead + // seed brokers, and registered brokers. + // - if error occurred on broker's tcp socket, close the tcp + // connection. + // - if it's seed broker or dead seed broker, remove it from + // the list. + client.checkBrokersHealth() + // For all the brokers we received: // - if it is a new ID, save it // - if it is an existing ID, but the address we have is stale, discard the old one and save it diff --git a/client_test.go b/client_test.go index 2fd819625..28e9b03cd 100644 --- a/client_test.go +++ b/client_test.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "io" + "net" + "runtime" "sync" "sync/atomic" "syscall" @@ -17,6 +19,15 @@ import ( "github.com/stretchr/testify/require" ) +func socketErrorProbeAvailable() bool { + switch runtime.GOOS { + case "aix", "darwin", "dragonfly", "freebsd", "linux", "netbsd", "openbsd", "solaris": + return true + default: + return false + } +} + func TestSimpleClient(t *testing.T) { seedBroker := NewMockBroker(t, 1) @@ -746,6 +757,129 @@ func TestClientResurrectDeadSeeds(t *testing.T) { safeClose(t, c) } +func TestClientCheckBrokersHealth(t *testing.T) { + newConnectedBroker := func(t *testing.T) (*Broker, *net.TCPConn, func()) { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + accepted := make(chan *net.TCPConn, 1) + acceptErr := make(chan error, 1) + go func() { + conn, err := listener.Accept() + if err != nil { + acceptErr <- err + return + } + accepted <- conn.(*net.TCPConn) + }() + + conn, err := net.Dial("tcp", listener.Addr().String()) + require.NoError(t, err) + + var serverConn *net.TCPConn + select { + case serverConn = <-accepted: + case err := <-acceptErr: + require.NoError(t, err) + case <-time.After(time.Second): + require.FailNow(t, "timed out waiting for test broker connection") + } + + broker := NewBroker(listener.Addr().String()) + broker.conn = conn.(*net.TCPConn) + broker.metricRegistry = metrics.NewRegistry() + broker.opened.Store(true) + + cleanup := func() { + _ = listener.Close() + _ = serverConn.Close() + _ = broker.Close() + } + + return broker, serverConn, cleanup + } + + t.Run("does not wait for brokers that are opening", func(t *testing.T) { + broker := &Broker{id: 1, addr: "127.0.0.1:9092"} + broker.lock.Lock() + defer broker.lock.Unlock() + + client := &client{ + brokers: map[int32]*Broker{broker.ID(): broker}, + } + + done := make(chan struct{}) + go func() { + client.checkBrokersHealth() + close(done) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + require.FailNow(t, "checkBrokersHealth blocked on a broker that was still opening") + } + }) + + t.Run("keeps unhealthy live seed brokers available for reopening", func(t *testing.T) { + if !socketErrorProbeAvailable() { + t.Skip("socket error probing is unavailable on this platform") + } + + broker, serverConn, cleanup := newConnectedBroker(t) + defer cleanup() + + client := &client{ + brokers: map[int32]*Broker{}, + seedBrokers: []*Broker{broker}, + } + + require.NoError(t, serverConn.SetLinger(0)) + require.NoError(t, serverConn.Close()) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + client.checkBrokersHealth() + + assert.Len(c, client.seedBrokers, 1) + assert.Same(c, broker, client.seedBrokers[0]) + + connected, err := broker.Connected() + assert.NoError(c, err) + assert.False(c, connected) + }, time.Second, 10*time.Millisecond) + }) + + t.Run("keeps unhealthy dead seed brokers available for reopening", func(t *testing.T) { + if !socketErrorProbeAvailable() { + t.Skip("socket error probing is unavailable on this platform") + } + + broker, serverConn, cleanup := newConnectedBroker(t) + defer cleanup() + + client := &client{ + brokers: map[int32]*Broker{}, + deadSeeds: []*Broker{broker}, + } + + require.NoError(t, serverConn.SetLinger(0)) + require.NoError(t, serverConn.Close()) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + client.checkBrokersHealth() + + assert.Len(c, client.deadSeeds, 1) + assert.Same(c, broker, client.deadSeeds[0]) + + connected, err := broker.Connected() + assert.NoError(c, err) + assert.False(c, connected) + }, time.Second, 10*time.Millisecond) + }) +} + //nolint:paralleltest func TestClientController(t *testing.T) { seedBroker := NewMockBroker(t, 1) diff --git a/consumer.go b/consumer.go index 60556a566..a56adf213 100644 --- a/consumer.go +++ b/consumer.go @@ -392,7 +392,7 @@ type PartitionConsumer interface { } type partitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG consumer *consumer conf *Config @@ -411,9 +411,9 @@ type partitionConsumer struct { responseResult error fetchSize int32 offset int64 - retries int32 + retries atomic.Int32 - paused int32 + paused atomic.Bool // accessed atomically, 0 = not paused, 1 = paused } var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing @@ -434,7 +434,7 @@ func (child *partitionConsumer) sendError(err error) { func (child *partitionConsumer) computeBackoff() time.Duration { if child.conf.Consumer.Retry.BackoffFunc != nil { - retries := atomic.AddInt32(&child.retries, 1) + retries := child.retries.Add(1) return child.conf.Consumer.Retry.BackoffFunc(int(retries)) } return child.conf.Consumer.Retry.Backoff @@ -508,7 +508,7 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error { return err } - child.highWaterMarkOffset = newestOffset + child.highWaterMarkOffset.Store(newestOffset) oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) if err != nil { @@ -562,7 +562,7 @@ func (child *partitionConsumer) Close() error { } func (child *partitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&child.highWaterMarkOffset) + return child.highWaterMarkOffset.Load() } func (child *partitionConsumer) responseFeeder() { @@ -575,7 +575,7 @@ feederLoop: msgs, child.responseResult = child.parseResponse(response) if child.responseResult == nil { - atomic.StoreInt32(&child.retries, 0) + child.retries.Store(0) } for i, msg := range msgs { @@ -751,7 +751,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu // we got messages, reset our fetch size in case it was increased for a previous request child.fetchSize = child.conf.Consumer.Fetch.Default - atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) + child.highWaterMarkOffset.Store(block.HighWaterMarkOffset) // abortedProducerIDs contains producerID which message should be ignored as uncommitted // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) @@ -837,17 +837,17 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) { // Pause implements PartitionConsumer. func (child *partitionConsumer) Pause() { - atomic.StoreInt32(&child.paused, 1) + child.paused.Store(true) } // Resume implements PartitionConsumer. func (child *partitionConsumer) Resume() { - atomic.StoreInt32(&child.paused, 0) + child.paused.Store(false) } // IsPaused implements PartitionConsumer. func (child *partitionConsumer) IsPaused() bool { - return atomic.LoadInt32(&child.paused) == 1 + return child.paused.Load() } type brokerConsumer struct { diff --git a/describe_configs_response.go b/describe_configs_response.go index 29ca988b7..59c4a3d30 100644 --- a/describe_configs_response.go +++ b/describe_configs_response.go @@ -47,6 +47,10 @@ func (c *DescribeConfigError) Error() string { return text } +func (c *DescribeConfigError) Unwrap() error { + return c.Err +} + type DescribeConfigsResponse struct { Version int16 ThrottleTime time.Duration diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index 60c4a0810..0ee883687 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -3,6 +3,7 @@ package sarama import ( + "errors" "testing" ) @@ -275,3 +276,34 @@ func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) { } testResponse(t, "response with error", response, describeConfigsResponseWithDefaultv1) } + +func TestDescribeConfigError(t *testing.T) { + // Assert that DescribeConfigError satisfies error interface + var err error = &DescribeConfigError{ + Err: ErrInvalidConfig, + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig") + } + + got := err.Error() + want := ErrInvalidConfig.Error() + if got != want { + t.Errorf("DescribeConfigError.Error() = %v; want %v", got, want) + } + + err = &DescribeConfigError{ + Err: ErrInvalidConfig, + ErrMsg: "invalid config value", + } + got = err.Error() + want = ErrInvalidConfig.Error() + " - invalid config value" + if got != want { + t.Errorf("DescribeConfigError.Error() = %v; want %v", got, want) + } + + if !errors.Is(err, ErrInvalidConfig) { + t.Errorf("expected errors.Is to match ErrInvalidConfig with ErrMsg set") + } +} diff --git a/go.mod b/go.mod index 913c5ad75..103c427fc 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,13 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/jcmturner/gofork v1.7.6 github.com/jcmturner/gokrb5/v8 v8.4.4 - github.com/klauspost/compress v1.16.7 - github.com/pierrec/lz4/v4 v4.1.18 - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 - github.com/stretchr/testify v1.8.4 - github.com/xdg-go/scram v1.1.2 - golang.org/x/net v0.15.0 - golang.org/x/sync v0.3.0 + github.com/klauspost/compress v1.18.5 + github.com/pierrec/lz4/v4 v4.1.26 + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 + github.com/stretchr/testify v1.11.1 + golang.org/x/net v0.53.0 + golang.org/x/sync v0.20.0 + golang.org/x/sys v0.43.0 ) require ( @@ -29,10 +29,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/stringprep v1.0.4 // indirect - golang.org/x/crypto v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/crypto v0.50.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b04f44732..78e450c04 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -43,12 +43,12 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -58,20 +58,14 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= -github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= -github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= -github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -79,28 +73,27 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= -golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/mocks/consumer.go b/mocks/consumer.go index ecd0bbcb6..0f6f1ab2a 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -222,16 +222,17 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset highWatermarkOffset = 0 } - c.partitionConsumers[topic][partition] = &PartitionConsumer{ - highWaterMarkOffset: highWatermarkOffset, - t: c.t, - topic: topic, - partition: partition, - offset: offset, - messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), - errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), + consumer := &PartitionConsumer{ + t: c.t, + topic: topic, + partition: partition, + offset: offset, + messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + suppressedMessages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize), + errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize), } + consumer.highWaterMarkOffset.Store(highWatermarkOffset) + c.partitionConsumers[topic][partition] = consumer } return c.partitionConsumers[topic][partition] @@ -247,7 +248,8 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset // Errors and Messages channel, you should specify what values will be provided on these // channels using YieldMessage and YieldError. type PartitionConsumer struct { - highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + highWaterMarkOffset atomic.Int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + suppressedHighWaterMarkOffset int64 l sync.Mutex t ErrorReporter topic string @@ -255,7 +257,6 @@ type PartitionConsumer struct { offset int64 messages chan *sarama.ConsumerMessage suppressedMessages chan *sarama.ConsumerMessage - suppressedHighWaterMarkOffset int64 errors chan *sarama.ConsumerError singleClose sync.Once consumed bool @@ -345,7 +346,7 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { } func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { - return atomic.LoadInt64(&pc.highWaterMarkOffset) + return pc.highWaterMarkOffset.Load() } // Pause implements the Pause method from the sarama.PartitionConsumer interface. @@ -353,7 +354,7 @@ func (pc *PartitionConsumer) Pause() { pc.l.Lock() defer pc.l.Unlock() - pc.suppressedHighWaterMarkOffset = atomic.LoadInt64(&pc.highWaterMarkOffset) + pc.suppressedHighWaterMarkOffset = pc.highWaterMarkOffset.Load() pc.paused = true } @@ -363,7 +364,7 @@ func (pc *PartitionConsumer) Resume() { pc.l.Lock() defer pc.l.Unlock() - pc.highWaterMarkOffset = atomic.LoadInt64(&pc.suppressedHighWaterMarkOffset) + pc.highWaterMarkOffset.Store(pc.suppressedHighWaterMarkOffset) for len(pc.suppressedMessages) > 0 { msg := <-pc.suppressedMessages pc.messages <- msg @@ -400,7 +401,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Offset = atomic.AddInt64(&pc.suppressedHighWaterMarkOffset, 1) - 1 pc.suppressedMessages <- msg } else { - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 + msg.Offset = pc.highWaterMarkOffset.Add(1) - 1 pc.messages <- msg } diff --git a/sockopt_other.go b/sockopt_other.go new file mode 100644 index 000000000..43d940571 --- /dev/null +++ b/sockopt_other.go @@ -0,0 +1,9 @@ +//go:build !unix || android || illumos || ios || hurd + +package sarama + +import "net" + +func getTCPConnSockError(_ *net.TCPConn) error { + return nil +} diff --git a/sockopt_posix.go b/sockopt_posix.go new file mode 100644 index 000000000..9aaf839af --- /dev/null +++ b/sockopt_posix.go @@ -0,0 +1,36 @@ +//go:build unix && !android && !illumos && !ios && !hurd + +// build on aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris + +package sarama + +import ( + "fmt" + "net" + + "golang.org/x/sys/unix" +) + +func getTCPConnSockError(conn *net.TCPConn) error { + rawConn, err := conn.SyscallConn() + if err != nil { + return fmt.Errorf("failed to get raw connection: %w", err) + } + + var sockErr int + var opErr error + + err = rawConn.Control(func(fd uintptr) { + sockErr, opErr = unix.GetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_ERROR) + }) + if err != nil { + return fmt.Errorf("failed to control raw connection: %w", err) + } + if opErr != nil { + return fmt.Errorf("failed to get socket error: %w", opErr) + } + if sockErr != 0 { + return unix.Errno(sockErr) + } + return nil +}