Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,19 @@ func (pool *BlockPool) removeTimedoutPeers() {
pool.sortPeers()
}

// HasPendingRequestFrom reports whether we have at least one outstanding block
// request directed at the given peer.
func (pool *BlockPool) HasPendingRequestFrom(peerID p2p.ID) bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, r := range pool.requesters {
if r.didRequestFrom(peerID) {
return true
}
}
return false
}

// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() (isCaughtUp bool, height, maxPeerHeight int64) {
Expand Down
49 changes: 49 additions & 0 deletions internal/blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,52 @@ func TestBlockPoolIgnoresPeersWithInflatedBase(t *testing.T) {
require.EqualValues(t, 300, pool.MaxPeerHeight(),
"peer with base > pool.height must not raise maxPeerHeight")
}

func TestBlockPoolHasPendingRequestFrom(t *testing.T) {
requestsCh := make(chan BlockRequest, 10)
errorsCh := make(chan peerError, 10)

pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())

const (
primary = p2p.ID("primary")
secondary = p2p.ID("secondary")
stranger = p2p.ID("stranger")
)

// check initial state
require.False(t, pool.HasPendingRequestFrom(primary))
require.False(t, pool.HasPendingRequestFrom(secondary))
require.False(t, pool.HasPendingRequestFrom(stranger))

// Install a requester for height 1 targeting `primary`. We set the
// fields directly so we don't have to spin up the request goroutine.
pool.mtx.Lock()
req1 := newBPRequester(pool, 1)
req1.peerID = primary
pool.requesters[1] = req1
pool.mtx.Unlock()

require.True(t, pool.HasPendingRequestFrom(primary), "requested peer should be reported as pending")
require.False(t, pool.HasPendingRequestFrom(stranger), "non-requested peer must not be reported as pending")

// A second requester at height 2 also covers the secondPeerID slot.
pool.mtx.Lock()
req2 := newBPRequester(pool, 2)
req2.peerID = primary
req2.secondPeerID = secondary
pool.requesters[2] = req2
pool.mtx.Unlock()

require.True(t, pool.HasPendingRequestFrom(secondary), "secondary peer slot should count as pending")

// Removing both requesters drops the pending state.
pool.mtx.Lock()
delete(pool.requesters, 1)
delete(pool.requesters, 2)
pool.mtx.Unlock()

require.False(t, pool.HasPendingRequestFrom(primary))
require.False(t, pool.HasPendingRequestFrom(secondary))
}
73 changes: 73 additions & 0 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blocksync

import (
"errors"
"fmt"
"reflect"
"sync"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/cometbft/cometbft/crypto"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/p2p"
bcstub "github.com/cometbft/cometbft/proto/tendermint/blocksync"
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/store"
"github.com/cometbft/cometbft/types"
Expand Down Expand Up @@ -51,6 +53,9 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}

// Reactor implements MsgBytesFilter.
var _ p2p.MsgBytesFilter = (*Reactor)(nil)

// Reactor handles long-term catchup syncing.
type Reactor struct {
p2p.BaseReactor
Expand Down Expand Up @@ -270,6 +275,74 @@ func (bcR *Reactor) handlePeerResponse(msg *bcproto.BlockResponse, src p2p.Peer)
}
}

// FilterMsgBytes implements p2p.MsgBytesFilter and rejects messages from
// unexpected peers before unmarshalling the request.
func (bcR *Reactor) FilterMsgBytes(chID byte, src p2p.Peer, msgBytes []byte) error {
// do not check invalid messages, will fail unmarshalling
if chID != BlocksyncChannel || len(msgBytes) == 0 {
return nil
}

// unmarshal into custom stub struct that will do no allocations so we can
// quickly and cheaply check the validity of BlockResponse message
var stub bcstub.SigCountMessage
if err := stub.Unmarshal(msgBytes); err != nil {
return fmt.Errorf("malformed blocksync message from peer %s: %w", src.ID(), err)
}
if stub.BlockResponse == nil {
// Not a BlockResponse oneof case, no extra validation to do in this
// cases
return nil
}

// Berachain adaptation from upstream #5860: upstream additionally guards on a
// Reactor.enabled atomic.Bool added in #5633. This fork does not bring #5633,
// so we use pool.IsRunning() alone — equivalent in practice because the only
// way HasPendingRequestFrom returns true is when the pool has live requesters,
// which requires pool.Start() to have run (would set enabled = true in the upstream).

// blocksync not running, we should not be getting a BlockResponse
if !bcR.pool.IsRunning() {
return errors.New("unsolicited BlockResponse: blocksync not active")
}

// ensure we have an outstanding request to this peer
if !bcR.pool.HasPendingRequestFrom(src.ID()) {
return fmt.Errorf("unsolicited BlockResponse from peer %s", src.ID())
}

// validate the commit count in the response
if err := validateMaxVotes(stub.BlockResponse); err != nil {
return fmt.Errorf("validating max votes in BlockResponse from peer %s: %w", src.ID(), err)
}

return nil
}

// validateMaxVotes validates that the number of commit signatures and extended
// commit signatures are both less than the MaxVotesCount, returns an error if
// not.
func validateMaxVotes(br *bcstub.SigCountBlockResponse) error {
commitSigs, extSigs := 0, 0
if br != nil {
if br.Block != nil && br.Block.LastCommit != nil {
commitSigs = len(br.Block.LastCommit.Signatures)
}
if br.ExtCommit != nil {
extSigs = len(br.ExtCommit.ExtendedSignatures)
}
}

if commitSigs > types.MaxVotesCount {
return fmt.Errorf("too many commit signatures: %d (max %d)", commitSigs, types.MaxVotesCount)
}
if extSigs > types.MaxVotesCount {
return fmt.Errorf("too many extended commit signatures: %d (max %d)", extSigs, types.MaxVotesCount)
}

return nil
}

// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
Expand Down
Loading
Loading