From 633a3daacf8bc64865d14e89dc4a2546fa8ccab2 Mon Sep 17 00:00:00 2001 From: Dominic Evans <8060970+dnwe@users.noreply.github.com> Date: Fri, 3 Oct 2025 20:33:54 +0100 Subject: [PATCH] fix: correct ApiVersionsResponse handling of ErrUnsupportedVersion (#3337) Improve the api_versions_response_test.go to be more representative of the different good and bad responses it might receive. Subsequently fix the ApiVersionsResponse decoding so that it correctly downgrades the decoder from flexible to non-flexible after reading the ErrorCode of UnsupportedVersion Signed-off-by: Dominic Evans --- api_versions_response.go | 13 ++- api_versions_response_test.go | 182 ++++++++++++++++++++++++++-------- encoder_decoder.go | 7 ++ 3 files changed, 161 insertions(+), 41 deletions(-) diff --git a/api_versions_response.go b/api_versions_response.go index 05ae9cbf2..3f3c11a41 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -1,6 +1,8 @@ package sarama -import "time" +import ( + "time" +) // ApiVersionsResponseKey contains the APIs supported by the broker. type ApiVersionsResponseKey struct { @@ -91,6 +93,15 @@ func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) (err error return err } + // KIP-511: if broker didn't understand the ApiVersionsRequest version then + // it replies with a V0 non-flexible ApiVersionResponse where its supported + // ApiVersionsRequest version is available in ApiKeys + if r.ErrorCode == int16(ErrUnsupportedVersion) { + // drop version to 0 and to revert packageDecoder to non-flexible for remaining decoding + r.Version = 0 + pd = downgradeFlexibleDecoder(pd) + } + numApiKeys, err := pd.getArrayLength() if err != nil { return err diff --git a/api_versions_response_test.go b/api_versions_response_test.go index d71b72ab7..ad246d2c1 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -2,60 +2,162 @@ package sarama -import "testing" +import ( + "testing" + + assert "github.com/stretchr/testify/require" +) var ( - apiVersionResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, - 0x00, 0x02, - 0x00, 0x01, + apiVersionResponseV0 = []byte{ + 0x00, 0x00, // no error + 0x00, 0x00, 0x00, 0x04, // array length 4 (APIs) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, // API Version Produce (v0-2) + 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, // API Version Fetch (v0-3) + 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, // API Version Offsets (v0-1) + 0x00, 0x03, 0x00, 0x00, 0x00, 0x02, // API Version Metadata (v0-2) + } + + apiVersionResponseV1V2 = []byte{ + 0x00, 0x00, // no error + 0x00, 0x00, 0x00, 0x05, // array length 5 (APIs) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // API Version Produce (v0-7) + 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, // API Version Fetch (v0-11) + 0x00, 0x02, 0x00, 0x00, 0x00, 0x05, // API Version Offsets (v0-5) + 0x00, 0x03, 0x00, 0x00, 0x00, 0x08, // API Version Metadata (v0-8) + 0x00, 0x04, 0x00, 0x00, 0x00, 0x02, // API Version LeaderAndIsr (v0-2) + 0x00, 0x00, 0x00, 0x40, // throttle time (64ms) } apiVersionResponseV3 = []byte{ 0x00, 0x00, // no error - 0x02, // compact array length 1 - 0x00, 0x03, - 0x00, 0x02, - 0x00, 0x01, - 0x00, // tagged fields - 0x00, 0x00, 0x00, 0x00, // throttle time - 0x01, 0x01, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // tagged fields (empty SupportedFeatures) + 0x07, // compact array length 6 (APIs) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, // API Version Produce (v0-8) + 0x00, // empty tagged fields + 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, // API Version Fetch (v0-11) + 0x00, // empty tagged fields + 0x00, 0x02, 0x00, 0x00, 0x00, 0x05, // API Version Offsets (v0-5) + 0x00, // empty tagged fields + 0x00, 0x03, 0x00, 0x00, 0x00, 0x09, // API Version Metadata (v0-9) + 0x00, // empty tagged fields + 0x00, 0x04, 0x00, 0x00, 0x00, 0x04, // API Version LeaderAndIsr (v0-4) + 0x00, // empty tagged fields + 0x00, 0x05, 0x00, 0x00, 0x00, 0x02, // API Version StopReplica (v0-2) + 0x00, // empty tagged fields + 0x00, 0x00, 0x00, 0x80, // throttle time (128ms) + 0x00, // empty tagged fields } -) -func TestApiVersionsResponse(t *testing.T) { - response := new(ApiVersionsResponse) - testVersionDecodable(t, "no error", response, apiVersionResponse, 0) - if response.ErrorCode != int16(ErrNoError) { - t.Error("Decoding error failed: no error expected but found", response.ErrorCode) + // unsupported version from kafka 0.10.2.1 + apiVersionsResponseUnsupportedVersionV0 = []byte{ + 0x00, 0x23, // unsupported version error + 0x00, 0x00, 0x00, 0x00, // array length 0 } - if response.ApiKeys[0].ApiKey != 0x03 { - t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey) + + // unsupported version from kafka 2.3.0 + apiVersionsResponseUnsupportedVersionV1V2 = []byte{ + 0x00, 0x23, // unsupported version error + 0x00, 0x00, 0x00, 0x00, // array length 0 + } + + // unsupported version from kafka 2.4.0 + apiVersionsResponseUnsupportedVersionV3 = []byte{ + 0x00, 0x23, // unsupported version error + 0x00, 0x00, 0x00, 0x01, // array length 1 + 0x00, 0x12, 0x00, 0x00, 0x00, 0x03, // API Version ApiVersions (v0-3) } - if response.ApiKeys[0].MinVersion != 0x02 { - t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion) + + // unsupported version from kafka 4.1.0 + apiVersionsResponseUnsupportedVersionV4 = []byte{ + 0x00, 0x23, // unsupported version error + 0x00, 0x00, 0x00, 0x01, // array length 1 + 0x00, 0x12, 0x00, 0x00, 0x00, 0x04, // API Version ApiVersions (v0-4) } - if response.ApiKeys[0].MaxVersion != 0x01 { - t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion) +) + +func TestApiVersionsResponseV0(t *testing.T) { + const v = 0 + response := new(ApiVersionsResponse) + testVersionDecodable(t, "no error V0", response, apiVersionResponseV0, v) + + assert.Equal(t, int16(ErrNoError), response.ErrorCode) + assert.Equal(t, []ApiVersionsResponseKey{ + {v, 0, 0, 2}, // API Version Produce (v0-2) + {v, 1, 0, 3}, // API Version Fetch (v0-3) + {v, 2, 0, 1}, // API Version Offsets (v0-1) + {v, 3, 0, 2}, // API Version Metadata (v0-2) + }, response.ApiKeys) +} + +func TestApiVersionsResponseV1V2(t *testing.T) { + response := new(ApiVersionsResponse) + + for _, v := range []int16{1, 2} { + testVersionDecodable(t, "no error V1V2", response, apiVersionResponseV1V2, v) + + assert.Equal(t, int16(ErrNoError), response.ErrorCode) + assert.Equal(t, []ApiVersionsResponseKey{ + {v, 0, 0, 7}, // API Version Produce (v0-7) + {v, 1, 0, 11}, // API Version Fetch (v0-11) + {v, 2, 0, 5}, // API Version Offsets (v0-5) + {v, 3, 0, 8}, // API Version Metadata (v0-8) + {v, 4, 0, 2}, // API Version LeaderAndIsr (v0-2) + }, response.ApiKeys) + assert.Equal(t, int32(64), response.ThrottleTimeMs) } } func TestApiVersionsResponseV3(t *testing.T) { + const v = 3 response := new(ApiVersionsResponse) - response.Version = 3 - testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3) - if response.ErrorCode != int16(ErrNoError) { - t.Error("Decoding error failed: no error expected but found", response.ErrorCode) - } - if response.ApiKeys[0].ApiKey != 0x03 { - t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey) - } - if response.ApiKeys[0].MinVersion != 0x02 { - t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion) - } - if response.ApiKeys[0].MaxVersion != 0x01 { - t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion) - } + response.Version = v + testVersionDecodable(t, "no error V3", response, apiVersionResponseV3, v) + assert.Equal(t, int16(ErrNoError), response.ErrorCode) + assert.Equal(t, []ApiVersionsResponseKey{ + {v, 0, 0, 8}, // API Version Produce (v0-8) + {v, 1, 0, 11}, // API Version Fetch (v0-11) + {v, 2, 0, 5}, // API Version Offsets (v0-5) + {v, 3, 0, 9}, // API Version Metadata (v0-9) + {v, 4, 0, 4}, // API Version LeaderAndIsr (v0-4) + {v, 5, 0, 2}, // API Version StopReplica (v0-2) + }, response.ApiKeys) + assert.Equal(t, int32(128), response.ThrottleTimeMs) +} + +func TestApiVersionsResponseUnsupportedVersion(t *testing.T) { + t.Run("V0", func(t *testing.T) { + response := new(ApiVersionsResponse) + response.Version = 3 + testVersionDecodable(t, "unsupported", response, apiVersionsResponseUnsupportedVersionV0, 3) + assert.Equal(t, int16(ErrUnsupportedVersion), response.ErrorCode) + assert.Empty(t, response.ApiKeys) + }) + + t.Run("V1V2", func(t *testing.T) { + response := new(ApiVersionsResponse) + response.Version = 3 + testVersionDecodable(t, "unsupported", response, apiVersionsResponseUnsupportedVersionV1V2, 3) + assert.Equal(t, int16(ErrUnsupportedVersion), response.ErrorCode) + assert.Empty(t, response.ApiKeys) + }) + + t.Run("V3", func(t *testing.T) { + response := new(ApiVersionsResponse) + response.Version = 3 + testVersionDecodable(t, "unsupported", response, apiVersionsResponseUnsupportedVersionV3, 3) + assert.Equal(t, int16(ErrUnsupportedVersion), response.ErrorCode) + assert.Equal(t, []ApiVersionsResponseKey{ + {0, 18, 0, 3}, // API Version ApiVersions (v0-3) + }, response.ApiKeys) + }) + + t.Run("V4", func(t *testing.T) { + response := new(ApiVersionsResponse) + response.Version = 4 + testVersionDecodable(t, "unsupported", response, apiVersionsResponseUnsupportedVersionV4, 4) + assert.Equal(t, int16(ErrUnsupportedVersion), response.ErrorCode) + assert.Equal(t, []ApiVersionsResponseKey{ + {0, 18, 0, 4}, // API Version ApiVersions (v0-4) + }, response.ApiKeys) + }) } diff --git a/encoder_decoder.go b/encoder_decoder.go index 445bd1db2..ef6022486 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -125,3 +125,10 @@ func prepareFlexibleEncoder(pe packetEncoder, req encoder) packetEncoder { } return pe } + +func downgradeFlexibleDecoder(pd packetDecoder) packetDecoder { + if f, ok := pd.(*realFlexibleDecoder); ok { + return f.realDecoder + } + return pd +}