Skip to content
96 changes: 84 additions & 12 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
--validator-keys Path to validator keys directory
--node-id Node identifier for validator assignment (default: lean_spec_0)
--is-aggregator Enable aggregator mode for attestation aggregation (default: false)
--aggregate-subnet-ids Comma-separated extra subnet IDs to subscribe/aggregate (e.g. "0,1,2")
--api-port Port for API server and Prometheus /metrics (default: 5052, 0 to disable)
"""

Expand Down Expand Up @@ -164,6 +165,7 @@ def _init_from_genesis(
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
aggregate_subnet_ids: tuple[SubnetId, ...] = (),
api_port: int | None = None,
) -> Node:
"""
Expand All @@ -174,6 +176,8 @@ def _init_from_genesis(
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.
aggregate_subnet_ids: Additional subnets to subscribe and aggregate from.
Only effective when is_aggregator is also True.
api_port: Port for API server and /metrics. None disables the API.

Returns:
Expand All @@ -197,6 +201,7 @@ def _init_from_genesis(
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
aggregate_subnet_ids=aggregate_subnet_ids,
api_config=ApiServerConfig(port=api_port) if api_port is not None else None,
)

Expand All @@ -210,6 +215,7 @@ async def _init_from_checkpoint(
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
aggregate_subnet_ids: tuple[SubnetId, ...] = (),
api_port: int | None = None,
) -> Node | None:
"""
Expand Down Expand Up @@ -238,6 +244,8 @@ async def _init_from_checkpoint(
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.
aggregate_subnet_ids: Additional subnets to subscribe and aggregate from.
Only effective when is_aggregator is also True.
api_port: Port for API server and /metrics. None disables the API.

Returns:
Expand Down Expand Up @@ -306,6 +314,7 @@ async def _init_from_checkpoint(
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
aggregate_subnet_ids=aggregate_subnet_ids,
api_config=ApiServerConfig(port=api_port) if api_port is not None else None,
)

Expand Down Expand Up @@ -399,6 +408,7 @@ async def run_node(
node_id: str = "lean_spec_0",
genesis_time_now: bool = False,
is_aggregator: bool = False,
aggregate_subnet_ids: tuple[SubnetId, ...] = (),
api_port: int | None = 5052,
) -> None:
"""
Expand All @@ -413,6 +423,8 @@ async def run_node(
node_id: Node identifier for validator assignment.
genesis_time_now: Override genesis time to current time for testing.
is_aggregator: Enable aggregator mode for attestation aggregation.
aggregate_subnet_ids: Additional subnets to subscribe and aggregate from.
Only effective when is_aggregator is also True.
api_port: Port for API server (health, fork_choice, /metrics). None or 0 disables.
"""
metrics.init(name="leanspec-node", version="0.0.1")
Expand Down Expand Up @@ -443,6 +455,8 @@ async def run_node(
# Log aggregator mode if enabled
if is_aggregator:
logger.info("Aggregator mode enabled - node will perform attestation aggregation")
if aggregate_subnet_ids:
logger.info("Aggregate subnet IDs configured: %s", list(aggregate_subnet_ids))

# Load validator keys if path provided.
#
Expand Down Expand Up @@ -494,18 +508,44 @@ async def run_node(
# subscriptions to peers.
block_topic = GossipTopic.block(GOSSIP_FORK_DIGEST).to_topic_id()
event_source.subscribe_gossip_topic(block_topic)
# Subscribe to attestation subnet topics based on local validator id.
validator_id = validator_registry.primary_index() if validator_registry else None
if validator_id is None:
subnet_id = SubnetId(0)
logger.info("No local validator id; subscribing to attestation subnet %d", subnet_id)
else:
subnet_id = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)
attestation_subnet_topic = GossipTopic.attestation_subnet(
GOSSIP_FORK_DIGEST, subnet_id
).to_topic_id()
event_source.subscribe_gossip_topic(attestation_subnet_topic)
logger.info("Subscribed to gossip topics: %s, %s", block_topic, attestation_subnet_topic)

# Determine attestation subnets to subscribe to.
#
# Subscribing to a subnet causes the p2p layer to receive all messages on it.
# Not subscribing saves bandwidth — messages arrive only if the node publishes
# to that subnet (via gossipsub fanout), not as a mesh member.
#
# Subscription rules:
# - All nodes with registered validators subscribe to their validator-derived subnets.
# This forms the mesh network for attestation propagation.
# - Aggregator nodes additionally subscribe to any explicit aggregate-subnet-ids.
# - Non-aggregator nodes with no validators skip attestation subscriptions entirely.
subscription_subnets: set[SubnetId] = set()

# Always subscribe to validator-derived subnets regardless of aggregator flag.
# Loop over all registered validators so each one's subnet is covered.
if validator_registry is not None:
for validator_id in validator_registry.indices():
subscription_subnets.add(validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT))

# Explicit aggregate subnets are additive but only meaningful for aggregators.
if is_aggregator:
if not subscription_subnets:
# Aggregator with no registered validators — fall back to subnet 0.
subscription_subnets.add(SubnetId(0))
subscription_subnets.update(aggregate_subnet_ids)

for subnet_id in subscription_subnets:
attestation_subnet_topic = GossipTopic.attestation_subnet(
GOSSIP_FORK_DIGEST, subnet_id
).to_topic_id()
event_source.subscribe_gossip_topic(attestation_subnet_topic)
logger.info("Subscribed to attestation subnet %d", subnet_id)

if not subscription_subnets:
logger.info("Not subscribing to any attestation subnet")

logger.info("Subscribed to block gossip topic: %s", block_topic)

# Two initialization paths: checkpoint sync or genesis sync.
#
Expand All @@ -528,6 +568,7 @@ async def run_node(
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
aggregate_subnet_ids=aggregate_subnet_ids,
api_port=api_port_int,
)
if node is None:
Expand All @@ -542,6 +583,7 @@ async def run_node(
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
aggregate_subnet_ids=aggregate_subnet_ids,
api_port=api_port_int,
)

Expand Down Expand Up @@ -678,6 +720,17 @@ def main() -> None:
action="store_true",
help="Enable aggregator mode (node performs attestation aggregation)",
)
parser.add_argument(
"--aggregate-subnet-ids",
type=str,
default=None,
metavar="SUBNETS",
help=(
"Comma-separated attestation subnet IDs to additionally subscribe and aggregate "
"(e.g. '0,1,2'). Requires --is-aggregator. "
"Adds to the validator-derived subnet."
),
)
parser.add_argument(
"--api-port",
type=int,
Expand All @@ -690,6 +743,24 @@ def main() -> None:

setup_logging(args.verbose, args.no_color)

# Parse --aggregate-subnet-ids from comma-separated string to tuple of SubnetId.
# Reject the flag upfront if --is-aggregator is not set.
aggregate_subnet_ids: tuple[SubnetId, ...] = ()
if args.aggregate_subnet_ids:
if not args.is_aggregator:
logger.error("--aggregate-subnet-ids requires --is-aggregator to be set")
sys.exit(1)
try:
aggregate_subnet_ids = tuple(
SubnetId(int(s.strip())) for s in args.aggregate_subnet_ids.split(",") if s.strip()
)
except ValueError:
logger.error(
"Invalid --aggregate-subnet-ids value: %r (expected comma-separated integers)",
args.aggregate_subnet_ids,
)
sys.exit(1)

# Use asyncio.run with proper task cancellation on interrupt.
# This ensures all tasks are cancelled and resources are released.
try:
Expand All @@ -703,6 +774,7 @@ def main() -> None:
node_id=args.node_id,
genesis_time_now=args.genesis_time_now,
is_aggregator=args.is_aggregator,
aggregate_subnet_ids=aggregate_subnet_ids,
api_port=args.api_port,
)
)
Expand Down
26 changes: 14 additions & 12 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from lean_spec.subspecs.chain.clock import Interval
from lean_spec.subspecs.chain.config import (
ATTESTATION_COMMITTEE_COUNT,
INTERVALS_PER_SLOT,
JUSTIFICATION_LOOKBACK_SLOTS,
)
Expand Down Expand Up @@ -330,17 +329,20 @@ def on_gossip_attestation(

This method:
1. Verifies the XMSS signature
2. If current node is aggregator, stores the signature in the gossip
signature map if it belongs to the current validator's subnet
3. Processes the attestation data via on_attestation
2. Stores the signature when the node is in aggregator mode

Subnet filtering happens at the p2p subscription layer — only
attestations from subscribed subnets reach this method. No
additional subnet check is needed here.

Args:
signed_attestation: The signed attestation from gossip.
scheme: XMSS signature scheme for verification.
is_aggregator: True if current validator holds aggregator role.
Only aggregator nodes store gossip attestation signatures.

Returns:
New Store with attestation processed and signature stored.
New Store with attestation processed and signature stored if aggregating.

Raises:
ValueError: If validator not found in state.
Expand Down Expand Up @@ -373,14 +375,14 @@ def on_gossip_attestation(
# Copy the inner sets so we can add to them without mutating the previous store.
new_committee_sigs = {k: set(v) for k, v in self.attestation_signatures.items()}

# Aggregators store all received gossip signatures.
# The p2p layer only delivers attestations from subscribed subnets,
# so subnet filtering happens at subscription time, not here.
# Non-aggregator nodes validate and drop — they never store gossip signatures.
if is_aggregator:
assert self.validator_id is not None, "Current validator ID must be set for aggregation"
current_subnet = self.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)
attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)
if current_subnet == attester_subnet:
new_committee_sigs.setdefault(attestation_data, set()).add(
AttestationSignatureEntry(validator_id, signature)
)
new_committee_sigs.setdefault(attestation_data, set()).add(
AttestationSignatureEntry(validator_id, signature)
)

# Return store with updated signature map and attestation data
return self.model_copy(
Expand Down
3 changes: 3 additions & 0 deletions src/lean_spec/subspecs/networking/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class NetworkService:
is_aggregator: bool = field(default=False)
"""Whether this node functions as an aggregator."""

aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple)
"""Explicit attestation subnets to subscribe and aggregate from (requires is_aggregator)."""

_running: bool = field(default=False, repr=False)
"""Whether the event loop is running."""

Expand Down
15 changes: 14 additions & 1 deletion src/lean_spec/subspecs/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from lean_spec.subspecs.containers.block.types import AggregatedAttestations
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.containers.state import Validators
from lean_spec.subspecs.containers.validator import ValidatorIndex
from lean_spec.subspecs.containers.validator import SubnetId, ValidatorIndex
from lean_spec.subspecs.forkchoice import Store
from lean_spec.subspecs.metrics import registry as metrics
from lean_spec.subspecs.networking import NetworkService
Expand Down Expand Up @@ -125,6 +125,17 @@ class NodeConfig:
- The node runs in standard validator or passive mode
"""

aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple)
"""
Additional attestation subnets to subscribe to and aggregate from.

When set, the node subscribes to these subnets at the p2p layer in
addition to validator-derived subnets. Effective only when is_aggregator
is True — only aggregators import gossip attestations into forkchoice.

Additive to the validator-derived subnet.
"""


@dataclass(slots=True)
class Node:
Expand Down Expand Up @@ -249,6 +260,7 @@ def from_genesis(cls, config: NodeConfig) -> Node:
network=config.network,
database=database,
is_aggregator=config.is_aggregator,
aggregate_subnet_ids=config.aggregate_subnet_ids,
genesis_start=True,
)

Expand All @@ -258,6 +270,7 @@ def from_genesis(cls, config: NodeConfig) -> Node:
event_source=config.event_source,
fork_digest=config.fork_digest,
is_aggregator=config.is_aggregator,
aggregate_subnet_ids=config.aggregate_subnet_ids,
)

# Wire up aggregated attestation publishing.
Expand Down
10 changes: 10 additions & 0 deletions src/lean_spec/subspecs/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
SignedBlock,
)
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.containers.validator import SubnetId
from lean_spec.subspecs.forkchoice.store import Store
from lean_spec.subspecs.metrics import registry as metrics
from lean_spec.subspecs.networking.reqresp.message import Status
Expand Down Expand Up @@ -159,6 +160,15 @@ class SyncService:
is_aggregator: bool = field(default=False)
"""Whether this node functions as an aggregator."""

aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple)
"""
Explicit subnet IDs to subscribe to and aggregate from.

When set, the node subscribes to these subnets at the p2p layer in
addition to its validator-derived subnet. Only active when is_aggregator
is also True — non-aggregator nodes never import gossip attestations.
"""

process_block: Callable[[Store, SignedBlock], Store] = field(default=default_block_processor)
"""Block processor function. Defaults to the store's block processing."""

Expand Down
Loading
Loading