Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions pkg/consensus/mimicry/p2p/reqresp/v1/eth/protocols.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package eth

import (
"github.com/ethpandaops/ethcore/pkg/consensus/mimicry/p2p/eth"
v1 "github.com/ethpandaops/ethcore/pkg/consensus/mimicry/p2p/reqresp/v1"
"github.com/libp2p/go-libp2p/core/protocol"
)

// NewStatus creates a status protocol with compile-time validated protocol ID.
func NewStatus[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewProtocol(
protocol.ID(eth.StatusV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewGoodbye creates a goodbye protocol with compile-time validated protocol ID.
func NewGoodbye[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewProtocol(
protocol.ID(eth.GoodbyeV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewPing creates a ping protocol with compile-time validated protocol ID.
func NewPing[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewProtocol(
protocol.ID(eth.PingV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewMetadataV1 creates a metadata V1 protocol with compile-time validated protocol ID.
func NewMetadataV1[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewProtocol(
protocol.ID(eth.MetaDataV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewMetadataV2 creates a metadata V2 protocol with compile-time validated protocol ID.
func NewMetadataV2[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewProtocol(
protocol.ID(eth.MetaDataV2ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBeaconBlocksByRangeV1 creates a beacon blocks by range V1 protocol with compile-time validated protocol ID.
func NewBeaconBlocksByRangeV1[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BeaconBlocksByRangeV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBeaconBlocksByRangeV2 creates a beacon blocks by range V2 protocol with compile-time validated protocol ID.
func NewBeaconBlocksByRangeV2[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BeaconBlocksByRangeV2ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBeaconBlocksByRootV1 creates a beacon blocks by root V1 protocol with compile-time validated protocol ID.
func NewBeaconBlocksByRootV1[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BeaconBlocksByRootV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBeaconBlocksByRootV2 creates a beacon blocks by root V2 protocol with compile-time validated protocol ID.
func NewBeaconBlocksByRootV2[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BeaconBlocksByRootV2ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBlobSidecarsByRangeV1 creates a blob sidecars by range V1 protocol with compile-time validated protocol ID.
func NewBlobSidecarsByRangeV1[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BlobSidecarsByRangeV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}

// NewBlobSidecarsByRootV1 creates a blob sidecars by root V1 protocol with compile-time validated protocol ID.
func NewBlobSidecarsByRootV1[TReq, TResp any](maxRequestSize, maxResponseSize uint64, networkEncoder v1.NetworkEncoder) v1.Protocol[TReq, TResp] {
return v1.NewChunkedProtocol(
protocol.ID(eth.BlobSidecarsByRootV1ProtocolID),
maxRequestSize,
maxResponseSize,
networkEncoder,
)
}
206 changes: 206 additions & 0 deletions pkg/consensus/mimicry/p2p/reqresp/v1/eth/protocols_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package eth_test

import (
"testing"

"github.com/ethpandaops/ethcore/pkg/consensus/mimicry/p2p/eth"
ethProtocols "github.com/ethpandaops/ethcore/pkg/consensus/mimicry/p2p/reqresp/v1/eth"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
)

// Mock types for testing.
type Status struct {
ForkDigest [4]byte
FinalizedRoot [32]byte
FinalizedEpoch uint64
HeadRoot [32]byte
HeadSlot uint64
}

type Goodbye struct {
Reason uint64
}

type Ping struct {
SeqNumber uint64
}

type Metadata struct {
SeqNumber uint64
Attnets [8]byte
}

type BeaconBlocksByRangeRequest struct {
StartSlot uint64
Count uint64
Step uint64
}

type BeaconBlock struct {
Slot uint64
ProposerIndex uint64
}

type BlobSidecar struct {
Index uint8
Blob [131072]byte
}

// Mock NetworkEncoder.
type mockNetworkEncoder struct{}

func (m *mockNetworkEncoder) EncodeNetwork(msg any) ([]byte, error) {
return []byte("encoded"), nil
}

func (m *mockNetworkEncoder) DecodeNetwork(data []byte, msgType any) error {
return nil
}

func TestEthProtocols(t *testing.T) {
encoder := &mockNetworkEncoder{}

t.Run("Status protocol", func(t *testing.T) {
proto := ethProtocols.NewStatus[Status, Status](84, 84, encoder)

require.Equal(t, protocol.ID(eth.StatusV1ProtocolID), proto.ID())
require.Equal(t, uint64(84), proto.MaxRequestSize())
require.Equal(t, uint64(84), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.False(t, proto.IsChunked())
})

t.Run("Goodbye protocol", func(t *testing.T) {
proto := ethProtocols.NewGoodbye[Goodbye, Goodbye](8, 8, encoder)

require.Equal(t, protocol.ID(eth.GoodbyeV1ProtocolID), proto.ID())
require.Equal(t, uint64(8), proto.MaxRequestSize())
require.Equal(t, uint64(8), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.False(t, proto.IsChunked())
})

t.Run("Ping protocol", func(t *testing.T) {
proto := ethProtocols.NewPing[Ping, Ping](8, 8, encoder)

require.Equal(t, protocol.ID(eth.PingV1ProtocolID), proto.ID())
require.Equal(t, uint64(8), proto.MaxRequestSize())
require.Equal(t, uint64(8), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.False(t, proto.IsChunked())
})

t.Run("MetadataV1 protocol", func(t *testing.T) {
proto := ethProtocols.NewMetadataV1[struct{}, Metadata](0, 17, encoder)

require.Equal(t, protocol.ID(eth.MetaDataV1ProtocolID), proto.ID())
require.Equal(t, uint64(0), proto.MaxRequestSize())
require.Equal(t, uint64(17), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.False(t, proto.IsChunked())
})

t.Run("MetadataV2 protocol", func(t *testing.T) {
proto := ethProtocols.NewMetadataV2[struct{}, Metadata](0, 17, encoder)

require.Equal(t, protocol.ID(eth.MetaDataV2ProtocolID), proto.ID())
require.Equal(t, uint64(0), proto.MaxRequestSize())
require.Equal(t, uint64(17), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.False(t, proto.IsChunked())
})

t.Run("BeaconBlocksByRangeV1 protocol", func(t *testing.T) {
proto := ethProtocols.NewBeaconBlocksByRangeV1[BeaconBlocksByRangeRequest, BeaconBlock](
24,
1<<20, // 1MB
encoder,
)

require.Equal(t, protocol.ID(eth.BeaconBlocksByRangeV1ProtocolID), proto.ID())
require.Equal(t, uint64(24), proto.MaxRequestSize())
require.Equal(t, uint64(1<<20), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})

t.Run("BeaconBlocksByRangeV2 protocol", func(t *testing.T) {
proto := ethProtocols.NewBeaconBlocksByRangeV2[BeaconBlocksByRangeRequest, BeaconBlock](
24,
10*(1<<20), // 10MB
encoder,
)

require.Equal(t, protocol.ID(eth.BeaconBlocksByRangeV2ProtocolID), proto.ID())
require.Equal(t, uint64(24), proto.MaxRequestSize())
require.Equal(t, uint64(10*(1<<20)), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})

t.Run("BeaconBlocksByRootV1 protocol", func(t *testing.T) {
proto := ethProtocols.NewBeaconBlocksByRootV1[[][32]byte, BeaconBlock](
1024,
1<<20, // 1MB
encoder,
)

require.Equal(t, protocol.ID(eth.BeaconBlocksByRootV1ProtocolID), proto.ID())
require.Equal(t, uint64(1024), proto.MaxRequestSize())
require.Equal(t, uint64(1<<20), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})

t.Run("BeaconBlocksByRootV2 protocol", func(t *testing.T) {
proto := ethProtocols.NewBeaconBlocksByRootV2[[][32]byte, BeaconBlock](
1024,
10*(1<<20), // 10MB
encoder,
)

require.Equal(t, protocol.ID(eth.BeaconBlocksByRootV2ProtocolID), proto.ID())
require.Equal(t, uint64(1024), proto.MaxRequestSize())
require.Equal(t, uint64(10*(1<<20)), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})

t.Run("BlobSidecarsByRangeV1 protocol", func(t *testing.T) {
proto := ethProtocols.NewBlobSidecarsByRangeV1[BeaconBlocksByRangeRequest, BlobSidecar](
24,
1<<17, // 128KB
encoder,
)

require.Equal(t, protocol.ID(eth.BlobSidecarsByRangeV1ProtocolID), proto.ID())
require.Equal(t, uint64(24), proto.MaxRequestSize())
require.Equal(t, uint64(1<<17), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})

t.Run("BlobSidecarsByRootV1 protocol", func(t *testing.T) {
proto := ethProtocols.NewBlobSidecarsByRootV1[[][32]byte, BlobSidecar](
1024,
1<<17, // 128KB
encoder,
)

require.Equal(t, protocol.ID(eth.BlobSidecarsByRootV1ProtocolID), proto.ID())
require.Equal(t, uint64(1024), proto.MaxRequestSize())
require.Equal(t, uint64(1<<17), proto.MaxResponseSize())
require.Equal(t, encoder, proto.NetworkEncoder())
require.True(t, proto.IsChunked())
})
}

// Test that protocols are properly typed.
func TestProtocolTypes(t *testing.T) {
encoder := &mockNetworkEncoder{}

// These should compile without issues
var _ = ethProtocols.NewStatus[Status, Status](84, 84, encoder)
var _ = ethProtocols.NewBeaconBlocksByRangeV2[BeaconBlocksByRangeRequest, BeaconBlock](24, 1<<20, encoder)
}
40 changes: 40 additions & 0 deletions pkg/consensus/mimicry/p2p/reqresp/v1/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package v1

import (
"context"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

// NetworkEncoder defines the interface for network encoding and decoding messages.
// This combines SSZ marshaling + Snappy compression in one step, matching
// how Ethereum consensus protocols actually work (ssz_snappy).
type NetworkEncoder interface {
// EncodeNetwork does SSZ marshaling + Snappy compression in one step
EncodeNetwork(msg any) ([]byte, error)

// DecodeNetwork does Snappy decompression + SSZ unmarshaling in one step
DecodeNetwork(data []byte, msgType any) error
}

// RequestHandler handles incoming requests and returns responses.
type RequestHandler[TReq, TResp any] func(ctx context.Context, req TReq, from peer.ID) (TResp, error)

// ChunkedRequestHandler handles requests that produce multiple response chunks.
type ChunkedRequestHandler[TReq, TResp any] func(ctx context.Context, req TReq, from peer.ID, w ChunkedResponseWriter[TResp]) error

// ChunkedResponseWriter allows writing response chunks for chunked protocols.
type ChunkedResponseWriter[TResp any] interface {
WriteChunk(resp TResp) error
Close() error
}

// Protocol represents a request-response protocol with typed requests and responses.
type Protocol[TReq, TResp any] interface {
ID() protocol.ID
MaxRequestSize() uint64
MaxResponseSize() uint64
NetworkEncoder() NetworkEncoder
IsChunked() bool
}
Loading
Loading