Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (c *AlterConfigError) Error() string {
return text
}

func (c *AlterConfigError) Unwrap() error {
return c.Err
}

// AlterConfigsResourceResponse is a response type for alter config resource
type AlterConfigsResourceResponse struct {
ErrorCode int16
Expand Down
32 changes: 32 additions & 0 deletions alter_configs_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package sarama

import (
"errors"
"testing"
)

Expand Down Expand Up @@ -45,3 +46,34 @@ func TestAlterConfigsResponse(t *testing.T) {
}
testResponse(t, "response with error", response, alterResponsePopulated)
}

func TestAlterConfigError(t *testing.T) {
// Assert that AlterConfigError satisfies error interface
var err error = &AlterConfigError{
Err: ErrInvalidConfig,
}

if !errors.Is(err, ErrInvalidConfig) {
t.Errorf("expected errors.Is to match ErrInvalidConfig")
}

got := err.Error()
want := ErrInvalidConfig.Error()
if got != want {
t.Errorf("AlterConfigError.Error() = %v; want %v", got, want)
}

err = &AlterConfigError{
Err: ErrInvalidConfig,
ErrMsg: "invalid config value",
}
got = err.Error()
want = ErrInvalidConfig.Error() + " - invalid config value"
if got != want {
t.Errorf("AlterConfigError.Error() = %v; want %v", got, want)
}

if !errors.Is(err, ErrInvalidConfig) {
t.Errorf("expected errors.Is to match ErrInvalidConfig with ErrMsg set")
}
}
47 changes: 32 additions & 15 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Broker struct {
conn net.Conn
connErr error
lock sync.Mutex
opened int32
opened atomic.Bool
responses chan *responsePromise
done chan bool

Expand Down Expand Up @@ -157,13 +157,37 @@ func NewBroker(addr string) *Broker {
return &Broker{id: -1, addr: addr}
}

func (b *Broker) getSockError() error {
// skip socket health checks while another operation owns broker state
if !b.lock.TryLock() {
return nil
}
defer b.lock.Unlock()

if b.conn == nil {
return nil
}

conn := b.conn
if c, ok := conn.(*bufConn); ok {
conn = c.Conn
}
if c, ok := conn.(*tls.Conn); ok {
conn = c.NetConn()
}
if c, ok := conn.(*net.TCPConn); ok {
return getTCPConnSockError(c)
}
return nil
}

// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
// waiting for the connection to complete. This means that any subsequent operations on the broker will
// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
if !b.opened.CompareAndSwap(false, true) {
return ErrAlreadyConnected
}

Expand All @@ -190,7 +214,7 @@ func (b *Broker) Open(conf *Config) error {
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)
return
}
if conf.Net.TLS.Enable {
Expand Down Expand Up @@ -275,7 +299,7 @@ func (b *Broker) Open(conf *Config) error {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)
return
}
}
Expand All @@ -287,18 +311,12 @@ func (b *Broker) Open(conf *Config) error {
if conf.Net.SASL.Enable && !useSaslV0 {
b.connErr = b.authenticateViaSASLv1()
if b.connErr != nil {
close(b.responses)
<-b.done
b.responses = nil
b.done = nil
err = b.conn.Close()
err = b.closeLocked()
if err == nil {
DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
}
Expand Down Expand Up @@ -377,12 +395,12 @@ func (b *Broker) closeLocked() error {
if b.responses != nil {
close(b.responses)
}
// close the socket before waiting so in-flight reads can exit
err := b.conn.Close()
if b.done != nil {
<-b.done
}

err := b.conn.Close()

b.conn = nil
b.responses = nil
b.done = nil
Expand All @@ -394,8 +412,7 @@ func (b *Broker) closeLocked() error {
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}

atomic.StoreInt32(&b.opened, 0)
b.opened.Store(false)

return err
}
Expand Down
46 changes: 46 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,52 @@ func TestBrokerFailedRequest(t *testing.T) {
}
}

func TestBrokerClose(t *testing.T) {
t.Run("interrupts active async response reads", func(t *testing.T) {
mockBroker := NewMockBroker(t, 0)
defer mockBroker.Close()

broker := NewBroker(mockBroker.Addr())
conf := NewTestConfig()
conf.ApiVersionsRequest = false
conf.Net.ReadTimeout = 30 * time.Second

require.NoError(t, broker.Open(conf))

request := ProduceRequest{}
request.RequiredAcks = WaitForLocal

responseErrs := make(chan error, 1)
err := broker.AsyncProduce(&request, func(_ *ProduceResponse, err error) {
responseErrs <- err
})
require.NoError(t, err)

closeErrs := make(chan error, 1)
go func() {
closeErrs <- broker.Close()
}()

select {
case err := <-closeErrs:
require.NoError(t, err)
case <-time.After(250 * time.Millisecond):
require.FailNow(t, "Close blocked with an active async response read")
}

select {
case err := <-responseErrs:
require.Error(t, err)
case <-time.After(time.Second):
require.FailNow(t, "timed out waiting for the async produce callback")
}

connected, err := broker.Connected()
require.NoError(t, err)
require.False(t, connected)
})
}

// closeImmediatelyDialer is a test dialer that returns a net.Conn whose peer is
// already closed. This reliably triggers a transport-level failure (e.g. EOF)
// during ApiVersions negotiation in Broker.Open.
Expand Down
40 changes: 37 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type client struct {
// updateMetadataMs stores the time at which metadata was lasted updated.
// Note: this accessed atomically so must be the first word in the struct
// as per golang/go#41970
updateMetadataMs int64
updateMetadataMs atomic.Int64

conf *Config
closer, closed chan none // for shutting down background metadata updater
Expand Down Expand Up @@ -725,6 +725,32 @@ func (client *client) randomizeSeedBrokers(addrs []string) {
}
}

func (client *client) checkSeedBrokersHealth(brokers []*Broker) {
if len(brokers) == 0 {
return
}

for _, broker := range brokers {
if err := broker.getSockError(); err != nil {
Logger.Printf("client/seedbrokers close seed broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err)
safeAsyncClose(broker)
}
}
}

func (client *client) checkBrokersHealth() {
for id, broker := range client.brokers {
if err := broker.getSockError(); err != nil {
Logger.Printf("client/brokers close broker #%d at %s due to socket error: %v", broker.ID(), broker.Addr(), err)
safeAsyncClose(broker)
delete(client.brokers, id)
}
}

client.checkSeedBrokersHealth(client.seedBrokers)
client.checkSeedBrokersHealth(client.deadSeeds)
}

func (client *client) updateBroker(brokers []*Broker) {
if client.brokers == nil {
return
Expand Down Expand Up @@ -1016,7 +1042,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
time.Sleep(backoff)
}

t := atomic.LoadInt64(&client.updateMetadataMs)
t := client.updateMetadataMs.Load()
if time.Since(time.UnixMilli(t)) < backoff {
return err
}
Expand All @@ -1041,7 +1067,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,

req := NewMetadataRequest(client.conf.Version, topics)
req.AllowAutoTopicCreation = allowAutoTopicCreation
atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli())
client.updateMetadataMs.Store(time.Now().UnixMilli())

response, err := broker.GetMetadata(req)
var kerror KError
Expand Down Expand Up @@ -1109,6 +1135,14 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
client.lock.Lock()
defer client.lock.Unlock()

// Check health of existing brokers, including seed brokers, dead
// seed brokers, and registered brokers.
// - if error occurred on broker's tcp socket, close the tcp
// connection.
// - if it's seed broker or dead seed broker, remove it from
// the list.
client.checkBrokersHealth()

// For all the brokers we received:
// - if it is a new ID, save it
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
Expand Down
Loading
Loading