From c83f296e5fda8c90536e6af82d782b23126549cd Mon Sep 17 00:00:00 2001 From: mattac21 Date: Mon, 11 May 2026 13:20:05 -0400 Subject: [PATCH] fix(blocksync): validate blocksync response sender and signature count (#5860) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Adds additional validation to blocksync, ensuring _before response unmarshalling_ that we have made a `BlockRequest` to the peer that is sending us a `BlockResponse` recently, and also that the response contains a valid amount of commit signatures (not > MaxVoteCount). To do this preunmarshal validation, we have added a `MsgBytesFilter` interface that `Reactors` can implement. Currently only `BLOCKSYNC` does. The `FilterMsgBytes` function is called for both comet P2P and libp2p implementations, inside of the `onReceive` function when setting up a peer for comet p2p, and inside of `handleStream` for libp2p, just before unmarshalling the message in both. - [x] Tests written/updated - [x] Changelog entry added in `CHANGELOG.md` - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --- Backport adaptations for bera-v1.x: - enabled atomic.Bool field NOT added. - internal/blocksync/reactor.go was hand-edited rather than auto-merged. - bcstub import alias used (proto/tendermint/blocksync) because bcproto already points to api/cometbft/blocksync/v1 in this fork. - lp2p/* hunks dropped — no libp2p reactor in this fork. - CHANGELOG.md not modified - Tests - pool_test.go kept as is - reactor_test.go adapted (different imports) - p2p/switch_test.go dropped adapted (BroadcastAsync/Broadcast, same) (cherry picked from commit 4590e7ace1a387599213670276bc6593aff338a2) --- internal/blocksync/pool.go | 13 + internal/blocksync/pool_test.go | 49 ++ internal/blocksync/reactor.go | 73 ++ internal/blocksync/reactor_test.go | 300 +++++++ p2p/base_reactor.go | 8 + p2p/peer.go | 8 + p2p/switch_test.go | 87 ++ proto/tendermint/blocksync/nosig.go | 14 + proto/tendermint/blocksync/stub.pb.go | 1108 +++++++++++++++++++++++++ proto/tendermint/blocksync/stub.proto | 37 + 10 files changed, 1697 insertions(+) create mode 100644 proto/tendermint/blocksync/nosig.go create mode 100644 proto/tendermint/blocksync/stub.pb.go create mode 100644 proto/tendermint/blocksync/stub.proto diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index c4aed3aada9..518d853ab00 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -185,6 +185,19 @@ func (pool *BlockPool) removeTimedoutPeers() { pool.sortPeers() } +// HasPendingRequestFrom reports whether we have at least one outstanding block +// request directed at the given peer. +func (pool *BlockPool) HasPendingRequestFrom(peerID p2p.ID) bool { + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, r := range pool.requesters { + if r.didRequestFrom(peerID) { + return true + } + } + return false +} + // IsCaughtUp returns true if this node is caught up, false - otherwise. // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() (isCaughtUp bool, height, maxPeerHeight int64) { diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index 0352155d24c..32768d1a6c2 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -534,3 +534,52 @@ func TestBlockPoolIgnoresPeersWithInflatedBase(t *testing.T) { require.EqualValues(t, 300, pool.MaxPeerHeight(), "peer with base > pool.height must not raise maxPeerHeight") } + +func TestBlockPoolHasPendingRequestFrom(t *testing.T) { + requestsCh := make(chan BlockRequest, 10) + errorsCh := make(chan peerError, 10) + + pool := NewBlockPool(1, requestsCh, errorsCh) + pool.SetLogger(log.TestingLogger()) + + const ( + primary = p2p.ID("primary") + secondary = p2p.ID("secondary") + stranger = p2p.ID("stranger") + ) + + // check initial state + require.False(t, pool.HasPendingRequestFrom(primary)) + require.False(t, pool.HasPendingRequestFrom(secondary)) + require.False(t, pool.HasPendingRequestFrom(stranger)) + + // Install a requester for height 1 targeting `primary`. We set the + // fields directly so we don't have to spin up the request goroutine. + pool.mtx.Lock() + req1 := newBPRequester(pool, 1) + req1.peerID = primary + pool.requesters[1] = req1 + pool.mtx.Unlock() + + require.True(t, pool.HasPendingRequestFrom(primary), "requested peer should be reported as pending") + require.False(t, pool.HasPendingRequestFrom(stranger), "non-requested peer must not be reported as pending") + + // A second requester at height 2 also covers the secondPeerID slot. + pool.mtx.Lock() + req2 := newBPRequester(pool, 2) + req2.peerID = primary + req2.secondPeerID = secondary + pool.requesters[2] = req2 + pool.mtx.Unlock() + + require.True(t, pool.HasPendingRequestFrom(secondary), "secondary peer slot should count as pending") + + // Removing both requesters drops the pending state. + pool.mtx.Lock() + delete(pool.requesters, 1) + delete(pool.requesters, 2) + pool.mtx.Unlock() + + require.False(t, pool.HasPendingRequestFrom(primary)) + require.False(t, pool.HasPendingRequestFrom(secondary)) +} diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index fdacb4c71b3..4f0b6255cf0 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -1,6 +1,7 @@ package blocksync import ( + "errors" "fmt" "reflect" "sync" @@ -10,6 +11,7 @@ import ( "github.com/cometbft/cometbft/crypto" "github.com/cometbft/cometbft/libs/log" "github.com/cometbft/cometbft/p2p" + bcstub "github.com/cometbft/cometbft/proto/tendermint/blocksync" sm "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/store" "github.com/cometbft/cometbft/types" @@ -51,6 +53,9 @@ func (e peerError) Error() string { return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) } +// Reactor implements MsgBytesFilter. +var _ p2p.MsgBytesFilter = (*Reactor)(nil) + // Reactor handles long-term catchup syncing. type Reactor struct { p2p.BaseReactor @@ -270,6 +275,74 @@ func (bcR *Reactor) handlePeerResponse(msg *bcproto.BlockResponse, src p2p.Peer) } } +// FilterMsgBytes implements p2p.MsgBytesFilter and rejects messages from +// unexpected peers before unmarshalling the request. +func (bcR *Reactor) FilterMsgBytes(chID byte, src p2p.Peer, msgBytes []byte) error { + // do not check invalid messages, will fail unmarshalling + if chID != BlocksyncChannel || len(msgBytes) == 0 { + return nil + } + + // unmarshal into custom stub struct that will do no allocations so we can + // quickly and cheaply check the validity of BlockResponse message + var stub bcstub.SigCountMessage + if err := stub.Unmarshal(msgBytes); err != nil { + return fmt.Errorf("malformed blocksync message from peer %s: %w", src.ID(), err) + } + if stub.BlockResponse == nil { + // Not a BlockResponse oneof case, no extra validation to do in this + // cases + return nil + } + + // Berachain adaptation from upstream #5860: upstream additionally guards on a + // Reactor.enabled atomic.Bool added in #5633. This fork does not bring #5633, + // so we use pool.IsRunning() alone — equivalent in practice because the only + // way HasPendingRequestFrom returns true is when the pool has live requesters, + // which requires pool.Start() to have run (would set enabled = true in the upstream). + + // blocksync not running, we should not be getting a BlockResponse + if !bcR.pool.IsRunning() { + return errors.New("unsolicited BlockResponse: blocksync not active") + } + + // ensure we have an outstanding request to this peer + if !bcR.pool.HasPendingRequestFrom(src.ID()) { + return fmt.Errorf("unsolicited BlockResponse from peer %s", src.ID()) + } + + // validate the commit count in the response + if err := validateMaxVotes(stub.BlockResponse); err != nil { + return fmt.Errorf("validating max votes in BlockResponse from peer %s: %w", src.ID(), err) + } + + return nil +} + +// validateMaxVotes validates that the number of commit signatures and extended +// commit signatures are both less than the MaxVotesCount, returns an error if +// not. +func validateMaxVotes(br *bcstub.SigCountBlockResponse) error { + commitSigs, extSigs := 0, 0 + if br != nil { + if br.Block != nil && br.Block.LastCommit != nil { + commitSigs = len(br.Block.LastCommit.Signatures) + } + if br.ExtCommit != nil { + extSigs = len(br.ExtCommit.ExtendedSignatures) + } + } + + if commitSigs > types.MaxVotesCount { + return fmt.Errorf("too many commit signatures: %d (max %d)", commitSigs, types.MaxVotesCount) + } + if extSigs > types.MaxVotesCount { + return fmt.Errorf("too many extended commit signatures: %d (max %d)", extSigs, types.MaxVotesCount) + } + + return nil +} + // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *Reactor) Receive(e p2p.Envelope) { if err := ValidateMsg(e.Message); err != nil { diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index cf728dd8dd9..b618e265540 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/cosmos/gogoproto/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -15,11 +16,14 @@ import ( dbm "github.com/cometbft/cometbft-db" abci "github.com/cometbft/cometbft/abci/types" bcproto "github.com/cometbft/cometbft/api/cometbft/blocksync/v1" + cmtproto "github.com/cometbft/cometbft/api/cometbft/types/v1" cfg "github.com/cometbft/cometbft/config" "github.com/cometbft/cometbft/internal/test" "github.com/cometbft/cometbft/libs/log" mpmocks "github.com/cometbft/cometbft/mempool/mocks" "github.com/cometbft/cometbft/p2p" + p2pmocks "github.com/cometbft/cometbft/p2p/mocks" + bcstub "github.com/cometbft/cometbft/proto/tendermint/blocksync" "github.com/cometbft/cometbft/proxy" sm "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/store" @@ -580,3 +584,299 @@ func (bcR *ByzantineReactor) Receive(e p2p.Envelope) { bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } + +// newFilterReactor builds a minimal Reactor wired to a started BlockPool, +// suitable for exercising FilterMsgBytes without spinning up the full p2p +// stack. +func newFilterReactor(t *testing.T, started bool) *Reactor { + t.Helper() + + requestsCh := make(chan BlockRequest, 1000) + errorsCh := make(chan peerError, 1000) + pool := NewBlockPool(1, requestsCh, errorsCh) + pool.SetLogger(log.TestingLogger()) + require.NoError(t, pool.Start()) + t.Cleanup(func() { _ = pool.Stop() }) + + if !started { + require.NoError(t, pool.Stop()) + } + + return &Reactor{pool: pool} +} + +// seedRequester inserts a bpRequester targeting peerID at the given height, +// bypassing makeRequestersRoutine so the test can drive pool state directly. +func seedRequester(r *Reactor, height int64, peerID p2p.ID) { + req := newBPRequester(r.pool, height) + req.peerID = peerID + r.pool.mtx.Lock() + r.pool.requesters[height] = req + r.pool.mtx.Unlock() +} + +func mockPeer(id p2p.ID) *p2pmocks.Peer { + p := &p2pmocks.Peer{} + p.On("ID").Return(id).Maybe() + return p +} + +func TestFilterMsgBytes(t *testing.T) { + + // Berachain adaptation from upstream #5860 for TestFilterMsgBytes: + // - Upstream uses an enabled atomic.Bool on the Reactor; this fork uses + // pool.IsRunning(). newFilterReactor takes a `started bool` and stops + // the pool when false to model the "blocksync inactive" state. + // - bcproto.SigCountMessage is at bcstub in this fork (separate package + // proto/tendermint/blocksync; bcproto already aliases + // api/cometbft/blocksync/v1). + // - cmtproto here is api/cometbft/types/v1 (upstream uses the legacy + // proto/tendermint/types path). + + wireBytesFor := func(t *testing.T, m *bcproto.Message) []byte { + t.Helper() + b, err := proto.Marshal(m) + require.NoError(t, err) + require.NotEmpty(t, b) + return b + } + + blockResponseBytes := func(t *testing.T) []byte { + return wireBytesFor(t, &bcproto.Message{ + Sum: &bcproto.Message_BlockResponse{ + BlockResponse: &bcproto.BlockResponse{Block: &cmtproto.Block{}}, + }, + }) + } + + blockRequestBytes := func(t *testing.T) []byte { + return wireBytesFor(t, &bcproto.Message{ + Sum: &bcproto.Message_BlockRequest{ + BlockRequest: &bcproto.BlockRequest{Height: 1}, + }, + }) + } + + const expected p2p.ID = "expected" + const unexpected p2p.ID = "unexpected" + + tests := []struct { + name string + setup func(t *testing.T) *Reactor + chID byte + peer p2p.ID + bytesFn func(t *testing.T) []byte + expectErr string // substring; "" means no error + }{ + { + name: "rejects BlockResponse when blocksync disabled", + setup: func(t *testing.T) *Reactor { return newFilterReactor(t, false) }, + chID: BlocksyncChannel, + peer: unexpected, + bytesFn: blockResponseBytes, + expectErr: "blocksync not active", + }, + { + name: "rejects unsolicited BlockResponse with no requesters", + setup: func(t *testing.T) *Reactor { return newFilterReactor(t, true) }, + chID: BlocksyncChannel, + peer: unexpected, + bytesFn: blockResponseBytes, + expectErr: "unsolicited BlockResponse from peer unexpected", + }, + { + name: "rejects BlockResponse from peer we did not request from", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: unexpected, + bytesFn: blockResponseBytes, + expectErr: "unsolicited BlockResponse from peer unexpected", + }, + { + name: "allows BlockResponse from solicited peer", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: blockResponseBytes, + }, + { + name: "allows non-BlockResponse messages even when disabled", + setup: func(t *testing.T) *Reactor { return newFilterReactor(t, false) }, + chID: BlocksyncChannel, + peer: "any", + bytesFn: blockRequestBytes, + }, + { + name: "ignores other channels", + setup: func(t *testing.T) *Reactor { return newFilterReactor(t, false) }, + chID: byte(0x20), + peer: "any", + bytesFn: blockResponseBytes, + }, + { + name: "ignores empty bytes", + setup: func(t *testing.T) *Reactor { return newFilterReactor(t, false) }, + chID: BlocksyncChannel, + peer: "any", + bytesFn: func(*testing.T) []byte { return nil }, + }, + { + name: "rejects BlockResponse after pool stopped", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + require.NoError(t, r.pool.Stop()) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: blockResponseBytes, + expectErr: "blocksync not active", + }, + { + name: "allows BlockResponse at MaxVotesCount commit signatures", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: func(t *testing.T) []byte { return blockResponseBytesWithSigs(t, types.MaxVotesCount, 0) }, + }, + { + name: "rejects BlockResponse exceeding commit signature cap", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: func(t *testing.T) []byte { return blockResponseBytesWithSigs(t, types.MaxVotesCount+1, 0) }, + expectErr: "too many commit signatures", + }, + { + name: "rejects BlockResponse exceeding extended commit signature cap", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: func(t *testing.T) []byte { return blockResponseBytesWithSigs(t, 0, types.MaxVotesCount+1) }, + expectErr: "too many extended commit signatures", + }, + { + name: "rejects BlockResponse splitting signatures across duplicate Block fields", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: func(t *testing.T) []byte { + half := types.MaxVotesCount/2 + 1 // 2*half > MaxVotesCount + a := blockResponseBytesWithSigs(t, half, 0) + b := blockResponseBytesWithSigs(t, half, 0) + return append(append([]byte{}, a...), b...) + }, + expectErr: "too many commit signatures", + }, + { + name: "rejects BlockResponse when first byte is not BlockResponse proto tag", + setup: func(t *testing.T) *Reactor { + r := newFilterReactor(t, true) + seedRequester(r, 1, expected) + return r + }, + chID: BlocksyncChannel, + peer: expected, + bytesFn: func(t *testing.T) []byte { + // Prepend an empty BlockRequest field (tag 0x0a, len 0) + // so msgBytes[0] != BlockResponse oneof tag, then append + // a real BlockResponse payload that exceeds the cap. + oversized := blockResponseBytesWithSigs(t, types.MaxVotesCount+1, 0) + return append([]byte{0x0a, 0x00}, oversized...) + }, + expectErr: "too many commit signatures", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + r := tc.setup(t) + err := r.FilterMsgBytes(tc.chID, mockPeer(tc.peer), tc.bytesFn(t)) + if tc.expectErr == "" { + require.NoError(t, err) + return + } + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectErr) + }) + } +} + +func TestStubUnmarshalAllocs(t *testing.T) { + tests := []struct { + name string + numCommits int + numExtCommits int + }{ + {"10k commit sigs", 10_000, 0}, + {"100k commit sigs", 100_000, 0}, + {"1m commit sigs", 1_000_000, 0}, + {"10k ext commit sigs", 0, 10_000}, + {"100k ext commit sigs", 0, 100_000}, + {"1m ext commit sigs", 0, 1_000_000}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + payload := blockResponseBytesWithSigs(t, tt.numCommits, tt.numExtCommits) + allocs := testing.AllocsPerRun(20, func() { + var stub bcstub.SigCountMessage + require.NoError(t, stub.Unmarshal(payload)) + require.Len(t, stub.BlockResponse.Block.LastCommit.Signatures, tt.numCommits) + require.Len(t, stub.BlockResponse.ExtCommit.ExtendedSignatures, tt.numExtCommits) + }) + const maxAllocs = 50 + require.LessOrEqualf(t, int(allocs), maxAllocs, "unmarshal allocated %d times, more than max allowed %d", int(allocs), maxAllocs) + }) + } +} + +func blockResponseBytesWithSigs(t *testing.T, commitSigs, extSigs int) []byte { + t.Helper() + commit := &cmtproto.Commit{Signatures: make([]cmtproto.CommitSig, commitSigs)} + for i := range commit.Signatures { + commit.Signatures[i] = cmtproto.CommitSig{BlockIdFlag: cmtproto.BlockIDFlagAbsent} + } + + ext := &cmtproto.ExtendedCommit{ExtendedSignatures: make([]cmtproto.ExtendedCommitSig, extSigs)} + for i := range ext.ExtendedSignatures { + ext.ExtendedSignatures[i] = cmtproto.ExtendedCommitSig{BlockIdFlag: cmtproto.BlockIDFlagAbsent} + } + + msg := &bcproto.Message{ + Sum: &bcproto.Message_BlockResponse{ + BlockResponse: &bcproto.BlockResponse{ + Block: &cmtproto.Block{LastCommit: commit}, + ExtCommit: ext, + }, + }, + } + + payload, err := proto.Marshal(msg) + require.NoError(t, err) + return payload +} diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 3acd9de622d..90dffa9c09e 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -43,6 +43,14 @@ type Reactor interface { Receive(e Envelope) } +// MsgBytesFilter is an optional interface a Reactor may implement to inspect +// raw message bytes on a channel before they are protobuf unmarshalled. If +// FilterMsgBytes returns a non-nil error, the message is dropped and the peer +// is disconnected. +type MsgBytesFilter interface { + FilterMsgBytes(chID byte, src Peer, msgBytes []byte) error +} + // -------------------------------------- type BaseReactor struct { diff --git a/p2p/peer.go b/p2p/peer.go index ee3f6ecd1f2..292676be84d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -415,6 +415,14 @@ func createMConnection( // which does onPeerError. panic(fmt.Sprintf("Unknown channel %X", chID)) } + + // give reactors a chance to reject the raw bytes before unmarshalling + if f, ok := reactor.(MsgBytesFilter); ok { + if err := f.FilterMsgBytes(chID, p, msgBytes); err != nil { + panic(fmt.Errorf("rejected msg on chID %#x from %s: %w", chID, p.ID(), err)) + } + } + mt := msgTypeByChID[chID] msg := proto.Clone(mt) err := proto.Unmarshal(msgBytes, msg) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 2b86376bd29..afaceb1cc23 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -875,3 +875,90 @@ func TestSwitchRemovalErr(t *testing.T) { assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error()) } + +// filteringTestReactor is a mock reactor that optionally filters messages via +// the MsgBytesFilter implementation. +type filteringTestReactor struct { + *TestReactor + filterCalls atomic.Int32 + rejectErr error // if non-nil, FilterMsgBytes returns this for matching channel + channelID byte +} + +func newFilteringTestReactor(channels []*conn.ChannelDescriptor, channelID byte, rejectErr error) *filteringTestReactor { + return &filteringTestReactor{ + TestReactor: NewTestReactor(channels, true), + channelID: channelID, + rejectErr: rejectErr, + } +} + +func (r *filteringTestReactor) FilterMsgBytes(chID byte, _ Peer, _ []byte) error { + if chID != r.channelID { + return nil + } + r.filterCalls.Add(1) + return r.rejectErr +} + +func TestSwitchMsgBytesFilter(t *testing.T) { + + // Berachain adaptation from upstream #5860: BroadcastAsync renamed + // to Broadcast — this fork's Broadcast is identical body to upstream's + // BroadcastAsync + + const filterChID = byte(0x00) + + // s1 accepts all bytes and s2 rejects bytes on filterChID + makeReactor := func(i int) *filteringTestReactor { + var rejectErr error + if i == 1 { + rejectErr = errors.New("rejected by filter for test") + } + return newFilteringTestReactor( + []*conn.ChannelDescriptor{ + {ID: filterChID, Priority: 10, MessageType: &p2pproto.Message{}}, + }, + filterChID, + rejectErr, + ) + } + + reactors := make(map[int]*filteringTestReactor) + + sw1, sw2 := MakeSwitchPair(func(i int, sw *Switch) *Switch { + sw.SetAddrBook(&AddrBookMock{ + Addrs: make(map[string]struct{}), + OurAddrs: make(map[string]struct{}), + }) + r := makeReactor(i) + reactors[i] = r + sw.AddReactor("filterTest", r) + return sw + }) + t.Cleanup(func() { + _ = sw1.Stop() + _ = sw2.Stop() + }) + + require.Equal(t, 1, sw1.Peers().Size()) + require.Equal(t, 1, sw2.Peers().Size()) + + // s1 broadcasts on the rejected channel, s2's filter must drop the message + // before proto.Unmarshal and disconnect s1 via mconn._recover + msg := &p2pproto.PexAddrs{Addrs: []p2pproto.NetAddress{{ID: "1"}}} + sw1.Broadcast(Envelope{ChannelID: filterChID, Message: msg}) + + // ensure s2's filter ran + require.Eventually(t, func() bool { + return reactors[1].filterCalls.Load() >= 1 + }, 5*time.Second, 50*time.Millisecond) + + // Receive was never called (rejection happened before unmarshal) + require.Empty(t, reactors[1].getMsgs(filterChID)) + + // s2 dropped s1 + require.Eventually(t, func() bool { + return sw2.Peers().Size() == 0 + }, 5*time.Second, 50*time.Millisecond) +} diff --git a/proto/tendermint/blocksync/nosig.go b/proto/tendermint/blocksync/nosig.go new file mode 100644 index 00000000000..959d574ac06 --- /dev/null +++ b/proto/tendermint/blocksync/nosig.go @@ -0,0 +1,14 @@ +package blocksync + +// NoSig is a zero-size stand-in for a CommitSig / ExtendedCommitSig used by +// the SigCount stub messages. Its Unmarshal accepts (and discards) any wire +// payload, and the slice-of-NoSig that gogoproto generates costs no memory +// per entry — only the slice header grows. +type NoSig struct{} + +func (NoSig) Marshal() ([]byte, error) { return nil, nil } +func (NoSig) MarshalTo([]byte) (int, error) { return 0, nil } +func (NoSig) MarshalToSizedBuffer([]byte) (int, error) { return 0, nil } +func (NoSig) Size() int { return 0 } + +func (*NoSig) Unmarshal([]byte) error { return nil } diff --git a/proto/tendermint/blocksync/stub.pb.go b/proto/tendermint/blocksync/stub.pb.go new file mode 100644 index 00000000000..efc5f6c25db --- /dev/null +++ b/proto/tendermint/blocksync/stub.pb.go @@ -0,0 +1,1108 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: tendermint/blocksync/stub.proto + +package blocksync + +import ( + fmt "fmt" + _ "github.com/cosmos/gogoproto/gogoproto" + proto "github.com/cosmos/gogoproto/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// SigCountMessage is a stripped down view of Message used by the blocksync +// MsgBytesFilter for validation before unmarshalling the full BlockResponse +// Message. Only the BlockResponse oneof case is modelled, and only the path +// down to {Block,Extended}Commit.signatures. +type SigCountMessage struct { + BlockResponse *SigCountBlockResponse `protobuf:"bytes,3,opt,name=block_response,json=blockResponse,proto3" json:"block_response,omitempty"` +} + +func (m *SigCountMessage) Reset() { *m = SigCountMessage{} } +func (m *SigCountMessage) String() string { return proto.CompactTextString(m) } +func (*SigCountMessage) ProtoMessage() {} +func (*SigCountMessage) Descriptor() ([]byte, []int) { + return fileDescriptor_c33a924f7a4c66c7, []int{0} +} +func (m *SigCountMessage) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SigCountMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SigCountMessage.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SigCountMessage) XXX_Merge(src proto.Message) { + xxx_messageInfo_SigCountMessage.Merge(m, src) +} +func (m *SigCountMessage) XXX_Size() int { + return m.Size() +} +func (m *SigCountMessage) XXX_DiscardUnknown() { + xxx_messageInfo_SigCountMessage.DiscardUnknown(m) +} + +var xxx_messageInfo_SigCountMessage proto.InternalMessageInfo + +func (m *SigCountMessage) GetBlockResponse() *SigCountBlockResponse { + if m != nil { + return m.BlockResponse + } + return nil +} + +type SigCountBlockResponse struct { + Block *SigCountBlock `protobuf:"bytes,1,opt,name=block,proto3" json:"block,omitempty"` + ExtCommit *SigCountExtendedCommit `protobuf:"bytes,2,opt,name=ext_commit,json=extCommit,proto3" json:"ext_commit,omitempty"` +} + +func (m *SigCountBlockResponse) Reset() { *m = SigCountBlockResponse{} } +func (m *SigCountBlockResponse) String() string { return proto.CompactTextString(m) } +func (*SigCountBlockResponse) ProtoMessage() {} +func (*SigCountBlockResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_c33a924f7a4c66c7, []int{1} +} +func (m *SigCountBlockResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SigCountBlockResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SigCountBlockResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SigCountBlockResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SigCountBlockResponse.Merge(m, src) +} +func (m *SigCountBlockResponse) XXX_Size() int { + return m.Size() +} +func (m *SigCountBlockResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SigCountBlockResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SigCountBlockResponse proto.InternalMessageInfo + +func (m *SigCountBlockResponse) GetBlock() *SigCountBlock { + if m != nil { + return m.Block + } + return nil +} + +func (m *SigCountBlockResponse) GetExtCommit() *SigCountExtendedCommit { + if m != nil { + return m.ExtCommit + } + return nil +} + +type SigCountBlock struct { + LastCommit *SigCountCommit `protobuf:"bytes,4,opt,name=last_commit,json=lastCommit,proto3" json:"last_commit,omitempty"` +} + +func (m *SigCountBlock) Reset() { *m = SigCountBlock{} } +func (m *SigCountBlock) String() string { return proto.CompactTextString(m) } +func (*SigCountBlock) ProtoMessage() {} +func (*SigCountBlock) Descriptor() ([]byte, []int) { + return fileDescriptor_c33a924f7a4c66c7, []int{2} +} +func (m *SigCountBlock) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SigCountBlock) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SigCountBlock.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SigCountBlock) XXX_Merge(src proto.Message) { + xxx_messageInfo_SigCountBlock.Merge(m, src) +} +func (m *SigCountBlock) XXX_Size() int { + return m.Size() +} +func (m *SigCountBlock) XXX_DiscardUnknown() { + xxx_messageInfo_SigCountBlock.DiscardUnknown(m) +} + +var xxx_messageInfo_SigCountBlock proto.InternalMessageInfo + +func (m *SigCountBlock) GetLastCommit() *SigCountCommit { + if m != nil { + return m.LastCommit + } + return nil +} + +type SigCountCommit struct { + Signatures []NoSig `protobuf:"bytes,4,rep,name=signatures,proto3,customtype=NoSig" json:"signatures"` +} + +func (m *SigCountCommit) Reset() { *m = SigCountCommit{} } +func (m *SigCountCommit) String() string { return proto.CompactTextString(m) } +func (*SigCountCommit) ProtoMessage() {} +func (*SigCountCommit) Descriptor() ([]byte, []int) { + return fileDescriptor_c33a924f7a4c66c7, []int{3} +} +func (m *SigCountCommit) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SigCountCommit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SigCountCommit.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SigCountCommit) XXX_Merge(src proto.Message) { + xxx_messageInfo_SigCountCommit.Merge(m, src) +} +func (m *SigCountCommit) XXX_Size() int { + return m.Size() +} +func (m *SigCountCommit) XXX_DiscardUnknown() { + xxx_messageInfo_SigCountCommit.DiscardUnknown(m) +} + +var xxx_messageInfo_SigCountCommit proto.InternalMessageInfo + +type SigCountExtendedCommit struct { + ExtendedSignatures []NoSig `protobuf:"bytes,4,rep,name=extended_signatures,json=extendedSignatures,proto3,customtype=NoSig" json:"extended_signatures"` +} + +func (m *SigCountExtendedCommit) Reset() { *m = SigCountExtendedCommit{} } +func (m *SigCountExtendedCommit) String() string { return proto.CompactTextString(m) } +func (*SigCountExtendedCommit) ProtoMessage() {} +func (*SigCountExtendedCommit) Descriptor() ([]byte, []int) { + return fileDescriptor_c33a924f7a4c66c7, []int{4} +} +func (m *SigCountExtendedCommit) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SigCountExtendedCommit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SigCountExtendedCommit.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SigCountExtendedCommit) XXX_Merge(src proto.Message) { + xxx_messageInfo_SigCountExtendedCommit.Merge(m, src) +} +func (m *SigCountExtendedCommit) XXX_Size() int { + return m.Size() +} +func (m *SigCountExtendedCommit) XXX_DiscardUnknown() { + xxx_messageInfo_SigCountExtendedCommit.DiscardUnknown(m) +} + +var xxx_messageInfo_SigCountExtendedCommit proto.InternalMessageInfo + +func init() { + proto.RegisterType((*SigCountMessage)(nil), "tendermint.blocksync.SigCountMessage") + proto.RegisterType((*SigCountBlockResponse)(nil), "tendermint.blocksync.SigCountBlockResponse") + proto.RegisterType((*SigCountBlock)(nil), "tendermint.blocksync.SigCountBlock") + proto.RegisterType((*SigCountCommit)(nil), "tendermint.blocksync.SigCountCommit") + proto.RegisterType((*SigCountExtendedCommit)(nil), "tendermint.blocksync.SigCountExtendedCommit") +} + +func init() { proto.RegisterFile("tendermint/blocksync/stub.proto", fileDescriptor_c33a924f7a4c66c7) } + +var fileDescriptor_c33a924f7a4c66c7 = []byte{ + // 350 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xc1, 0x4a, 0xf3, 0x40, + 0x10, 0xc7, 0x93, 0xaf, 0xed, 0x07, 0x4e, 0x6d, 0x85, 0xb5, 0x4a, 0xf0, 0x90, 0x96, 0xe8, 0xa1, + 0xa0, 0x26, 0xa0, 0x07, 0xf1, 0xa2, 0xd0, 0xd2, 0x93, 0x28, 0x98, 0x82, 0x88, 0x97, 0xd2, 0xa4, + 0xeb, 0x1a, 0x6c, 0xb2, 0x25, 0x3b, 0x81, 0xfa, 0x16, 0xbe, 0x81, 0xaf, 0xd3, 0x63, 0x8f, 0xe2, + 0xa1, 0x48, 0xfb, 0x22, 0xd2, 0x4d, 0x9a, 0xb6, 0x10, 0xaa, 0xb7, 0xd9, 0x99, 0xdf, 0xef, 0x3f, + 0x7b, 0x18, 0xa8, 0x22, 0x0d, 0x7a, 0x34, 0xf4, 0xbd, 0x00, 0x2d, 0xa7, 0xcf, 0xdd, 0x57, 0xf1, + 0x16, 0xb8, 0x96, 0xc0, 0xc8, 0x31, 0x07, 0x21, 0x47, 0x4e, 0x2a, 0x4b, 0xc0, 0x4c, 0x81, 0x83, + 0x0a, 0xe3, 0x8c, 0x4b, 0xc0, 0x9a, 0x57, 0x31, 0x6b, 0x50, 0xd8, 0x69, 0x7b, 0xac, 0xc9, 0xa3, + 0x00, 0x6f, 0xa9, 0x10, 0x5d, 0x46, 0x89, 0x0d, 0x65, 0x69, 0x75, 0x42, 0x2a, 0x06, 0x3c, 0x10, + 0x54, 0xcb, 0xd5, 0xd4, 0x7a, 0xf1, 0xec, 0xd8, 0xcc, 0xca, 0x35, 0x17, 0x7a, 0x63, 0xde, 0xb1, + 0x13, 0xc5, 0x2e, 0x39, 0xab, 0x4f, 0xe3, 0x43, 0x85, 0xbd, 0x4c, 0x90, 0x5c, 0x42, 0x41, 0xa2, + 0x9a, 0x2a, 0x97, 0x1c, 0xfe, 0x65, 0x49, 0x6c, 0x90, 0x1b, 0x00, 0x3a, 0xc4, 0x8e, 0xcb, 0x7d, + 0xdf, 0x43, 0xed, 0x9f, 0xf4, 0x4f, 0x36, 0xfb, 0xad, 0xa1, 0x1c, 0xf7, 0x9a, 0xd2, 0xb1, 0xb7, + 0xe8, 0x10, 0xe3, 0xd2, 0x78, 0x80, 0xd2, 0xda, 0x12, 0xd2, 0x82, 0x62, 0xbf, 0x2b, 0xd2, 0xf8, + 0xbc, 0x8c, 0x3f, 0xda, 0x1c, 0x9f, 0xc4, 0xc2, 0x5c, 0x4c, 0x72, 0xaf, 0xa1, 0xbc, 0x3e, 0x25, + 0xa7, 0x00, 0xc2, 0x63, 0x41, 0x17, 0xa3, 0x90, 0x0a, 0x2d, 0x5f, 0xcb, 0xd5, 0xb7, 0x1b, 0xa5, + 0xd1, 0xa4, 0xaa, 0x7c, 0x4d, 0xaa, 0x85, 0x3b, 0xde, 0xf6, 0x98, 0xbd, 0x02, 0x18, 0x8f, 0xb0, + 0x9f, 0xfd, 0x7b, 0x72, 0x05, 0xbb, 0x34, 0xe9, 0x74, 0x7e, 0x4b, 0x24, 0x0b, 0xb2, 0x9d, 0x82, + 0x8d, 0xfb, 0xd1, 0x54, 0x57, 0xc7, 0x53, 0x5d, 0xfd, 0x9e, 0xea, 0xea, 0xfb, 0x4c, 0x57, 0xc6, + 0x33, 0x5d, 0xf9, 0x9c, 0xe9, 0xca, 0xd3, 0x05, 0xf3, 0xf0, 0x25, 0x72, 0x4c, 0x97, 0xfb, 0x96, + 0xcb, 0x7d, 0x8a, 0xce, 0x33, 0x2e, 0x8b, 0xf8, 0x88, 0xb2, 0xae, 0xd0, 0xf9, 0x2f, 0x67, 0xe7, + 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x83, 0x31, 0xc8, 0x0e, 0xa4, 0x02, 0x00, 0x00, +} + +func (m *SigCountMessage) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SigCountMessage) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SigCountMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.BlockResponse != nil { + { + size, err := m.BlockResponse.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + return len(dAtA) - i, nil +} + +func (m *SigCountBlockResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SigCountBlockResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SigCountBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.ExtCommit != nil { + { + size, err := m.ExtCommit.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Block != nil { + { + size, err := m.Block.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *SigCountBlock) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SigCountBlock) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SigCountBlock) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.LastCommit != nil { + { + size, err := m.LastCommit.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} + +func (m *SigCountCommit) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SigCountCommit) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SigCountCommit) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Signatures) > 0 { + for iNdEx := len(m.Signatures) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.Signatures[iNdEx].Size() + i -= size + if _, err := m.Signatures[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + } + return len(dAtA) - i, nil +} + +func (m *SigCountExtendedCommit) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SigCountExtendedCommit) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SigCountExtendedCommit) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ExtendedSignatures) > 0 { + for iNdEx := len(m.ExtendedSignatures) - 1; iNdEx >= 0; iNdEx-- { + { + size := m.ExtendedSignatures[iNdEx].Size() + i -= size + if _, err := m.ExtendedSignatures[iNdEx].MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintStub(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + } + return len(dAtA) - i, nil +} + +func encodeVarintStub(dAtA []byte, offset int, v uint64) int { + offset -= sovStub(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *SigCountMessage) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.BlockResponse != nil { + l = m.BlockResponse.Size() + n += 1 + l + sovStub(uint64(l)) + } + return n +} + +func (m *SigCountBlockResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Block != nil { + l = m.Block.Size() + n += 1 + l + sovStub(uint64(l)) + } + if m.ExtCommit != nil { + l = m.ExtCommit.Size() + n += 1 + l + sovStub(uint64(l)) + } + return n +} + +func (m *SigCountBlock) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.LastCommit != nil { + l = m.LastCommit.Size() + n += 1 + l + sovStub(uint64(l)) + } + return n +} + +func (m *SigCountCommit) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Signatures) > 0 { + for _, e := range m.Signatures { + l = e.Size() + n += 1 + l + sovStub(uint64(l)) + } + } + return n +} + +func (m *SigCountExtendedCommit) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.ExtendedSignatures) > 0 { + for _, e := range m.ExtendedSignatures { + l = e.Size() + n += 1 + l + sovStub(uint64(l)) + } + } + return n +} + +func sovStub(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozStub(x uint64) (n int) { + return sovStub(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *SigCountMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SigCountMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SigCountMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BlockResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.BlockResponse == nil { + m.BlockResponse = &SigCountBlockResponse{} + } + if err := m.BlockResponse.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStub(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStub + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SigCountBlockResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SigCountBlockResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SigCountBlockResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Block", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Block == nil { + m.Block = &SigCountBlock{} + } + if err := m.Block.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ExtCommit", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ExtCommit == nil { + m.ExtCommit = &SigCountExtendedCommit{} + } + if err := m.ExtCommit.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStub(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStub + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SigCountBlock) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SigCountBlock: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SigCountBlock: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastCommit", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LastCommit == nil { + m.LastCommit = &SigCountCommit{} + } + if err := m.LastCommit.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStub(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStub + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SigCountCommit) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SigCountCommit: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SigCountCommit: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Signatures", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v NoSig + m.Signatures = append(m.Signatures, v) + if err := m.Signatures[len(m.Signatures)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStub(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStub + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SigCountExtendedCommit) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SigCountExtendedCommit: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SigCountExtendedCommit: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ExtendedSignatures", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStub + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthStub + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthStub + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v NoSig + m.ExtendedSignatures = append(m.ExtendedSignatures, v) + if err := m.ExtendedSignatures[len(m.ExtendedSignatures)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStub(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthStub + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipStub(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStub + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStub + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowStub + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthStub + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupStub + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthStub + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthStub = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowStub = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupStub = fmt.Errorf("proto: unexpected end of group") +) diff --git a/proto/tendermint/blocksync/stub.proto b/proto/tendermint/blocksync/stub.proto new file mode 100644 index 00000000000..3dfa79bf2bc --- /dev/null +++ b/proto/tendermint/blocksync/stub.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; +package tendermint.blocksync; + +import "gogoproto/gogo.proto"; + +option go_package = "github.com/cometbft/cometbft/proto/tendermint/blocksync"; + +// SigCountMessage is a stripped down view of Message used by the blocksync +// MsgBytesFilter for validation before unmarshalling the full BlockResponse +// Message. Only the BlockResponse oneof case is modelled, and only the path +// down to {Block,Extended}Commit.signatures. +message SigCountMessage { + SigCountBlockResponse block_response = 3; +} + +message SigCountBlockResponse { + SigCountBlock block = 1; + SigCountExtendedCommit ext_commit = 2; +} + +message SigCountBlock { + SigCountCommit last_commit = 4; +} + +message SigCountCommit { + repeated bytes signatures = 4 [ + (gogoproto.customtype) = "NoSig", + (gogoproto.nullable) = false + ]; +} + +message SigCountExtendedCommit { + repeated bytes extended_signatures = 4 [ + (gogoproto.customtype) = "NoSig", + (gogoproto.nullable) = false + ]; +}