From 8882c9db74ed9a85fc0671316c6298fa6a5675e8 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 14 May 2026 11:50:18 +1000 Subject: [PATCH 1/2] recognize fast_confirmation event topic Adds the topic constant + a no-op handleEvent case so beacon stops returning "unknown event topic fast_confirmation" once a node opts in via subscription. The raw event is already broadcast via publishEvent, so consumers tapping OnEvent receive it without any typed parser. --- pkg/beacon/event.go | 1 + pkg/beacon/subscriptions.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/pkg/beacon/event.go b/pkg/beacon/event.go index d239ecd..656ba9d 100644 --- a/pkg/beacon/event.go +++ b/pkg/beacon/event.go @@ -52,6 +52,7 @@ const ( topicContributionAndProof = "contribution_and_proof" topicBlobSidecar = "blob_sidecar" topicDataColumnSidecar = "data_column_sidecar" + topicFastConfirmation = "fast_confirmation" topicEvent = "raw_event" ) diff --git a/pkg/beacon/subscriptions.go b/pkg/beacon/subscriptions.go index bf41fd4..885133c 100644 --- a/pkg/beacon/subscriptions.go +++ b/pkg/beacon/subscriptions.go @@ -102,6 +102,9 @@ func (n *node) handleEvent(ctx context.Context, event *v1.Event) error { return n.handleBlobSidecar(ctx, event) case topicDataColumnSidecar: return n.handleDataColumnSidecar(ctx, event) + case topicFastConfirmation: + // No typed parser yet; raw event is already broadcast via publishEvent above. + return nil default: return fmt.Errorf("unknown event topic %s", event.Topic) From 9e4f4820011fb308e0dca8333700256748cb141f Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 14 May 2026 12:14:42 +1000 Subject: [PATCH 2/2] publish typed fast_confirmation events Wires the topic the same way as block_gossip: typed handleEvent case, publishFastConfirmation broker emit, and an OnFastConfirmation subscriber. Bumps go-eth2-client to the fork pseudo-version that ships FastConfirmationEvent so the type-assert resolves. --- go.mod | 2 +- go.sum | 2 ++ pkg/beacon/beacon.go | 2 ++ pkg/beacon/publisher.go | 4 ++++ pkg/beacon/subscriber.go | 6 ++++++ pkg/beacon/subscriptions.go | 14 ++++++++++++-- 6 files changed, 27 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2ac1cd7..9e75bdd 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 github.com/ethereum/go-ethereum v1.16.4 github.com/ethpandaops/ethwallclock v0.2.0 - github.com/ethpandaops/go-eth2-client v0.1.2 + github.com/ethpandaops/go-eth2-client v0.1.3-0.20260513062559-5fb497ba414f github.com/go-co-op/gocron v1.16.2 github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 diff --git a/go.sum b/go.sum index 68bab55..0838dd1 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/ethpandaops/ethwallclock v0.2.0 h1:EeFKtZ7v6TAdn/oAh0xaPujD7N4amjBxrW github.com/ethpandaops/ethwallclock v0.2.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24= github.com/ethpandaops/go-eth2-client v0.1.2 h1:nJr0YBmqHtbVeLeWEDyXwjCEO0AFt1Z0lIciMYlowfU= github.com/ethpandaops/go-eth2-client v0.1.2/go.mod h1:U3KdR8QSq8vqs9LWSGAF4ETHJpcB62E1DFf0gVMgWv0= +github.com/ethpandaops/go-eth2-client v0.1.3-0.20260513062559-5fb497ba414f h1:9k0z0tEayikToEqZn71xKIRtCNzH6mwUkIJk70kbSSk= +github.com/ethpandaops/go-eth2-client v0.1.3-0.20260513062559-5fb497ba414f/go.mod h1:U3KdR8QSq8vqs9LWSGAF4ETHJpcB62E1DFf0gVMgWv0= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/pkg/beacon/beacon.go b/pkg/beacon/beacon.go index db5d3bf..0b1eb83 100644 --- a/pkg/beacon/beacon.go +++ b/pkg/beacon/beacon.go @@ -133,6 +133,8 @@ type Node interface { OnBlobSidecar(ctx context.Context, handler func(ctx context.Context, ev *v1.BlobSidecarEvent) error) // OnDataColumnSidecar is called when a data column sidecar is received. OnDataColumnSidecar(ctx context.Context, handler func(ctx context.Context, ev *v1.DataColumnSidecarEvent) error) + // OnFastConfirmation is called when a fast confirmation is received. + OnFastConfirmation(ctx context.Context, handler func(ctx context.Context, ev *v1.FastConfirmationEvent) error) // - Custom events // OnReady is called when the node is ready. diff --git a/pkg/beacon/publisher.go b/pkg/beacon/publisher.go index d7103ad..cca22a9 100644 --- a/pkg/beacon/publisher.go +++ b/pkg/beacon/publisher.go @@ -22,6 +22,10 @@ func (n *node) publishBlockGossip(ctx context.Context, event *v1.BlockGossipEven n.broker.Emit(topicBlockGossip, event) } +func (n *node) publishFastConfirmation(ctx context.Context, event *v1.FastConfirmationEvent) { + n.broker.Emit(topicFastConfirmation, event) +} + func (n *node) publishAttestation(ctx context.Context, event *spec.VersionedAttestation) { n.broker.Emit(topicAttestation, event) } diff --git a/pkg/beacon/subscriber.go b/pkg/beacon/subscriber.go index bdbb974..ff37bd7 100644 --- a/pkg/beacon/subscriber.go +++ b/pkg/beacon/subscriber.go @@ -29,6 +29,12 @@ func (n *node) OnBlockGossip(ctx context.Context, handler func(ctx context.Conte }) } +func (n *node) OnFastConfirmation(ctx context.Context, handler func(ctx context.Context, event *v1.FastConfirmationEvent) error) { + n.broker.On(topicFastConfirmation, func(event *v1.FastConfirmationEvent) { + n.handleSubscriberError(handler(ctx, event), topicFastConfirmation) + }) +} + func (n *node) OnAttestation(ctx context.Context, handler func(ctx context.Context, event *spec.VersionedAttestation) error) { n.broker.On(topicAttestation, func(event *spec.VersionedAttestation) { n.handleSubscriberError(handler(ctx, event), topicAttestation) diff --git a/pkg/beacon/subscriptions.go b/pkg/beacon/subscriptions.go index dc4a9da..538b3c4 100644 --- a/pkg/beacon/subscriptions.go +++ b/pkg/beacon/subscriptions.go @@ -103,8 +103,7 @@ func (n *node) handleEvent(ctx context.Context, event *v1.Event) error { case topicDataColumnSidecar: return n.handleDataColumnSidecar(ctx, event) case topicFastConfirmation: - // No typed parser yet; raw event is already broadcast via publishEvent above. - return nil + return n.handleFastConfirmation(ctx, event) default: return fmt.Errorf("unknown event topic %s", event.Topic) @@ -155,6 +154,17 @@ func (n *node) handleBlockGossip(ctx context.Context, event *v1.Event) error { return nil } +func (n *node) handleFastConfirmation(ctx context.Context, event *v1.Event) error { + fastConfirmation, valid := event.Data.(*v1.FastConfirmationEvent) + if !valid { + return errors.New("invalid fast confirmation event") + } + + n.publishFastConfirmation(ctx, fastConfirmation) + + return nil +} + func (n *node) handleChainReorg(ctx context.Context, event *v1.Event) error { chainReorg, valid := event.Data.(*v1.ChainReorgEvent) if !valid {