diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index f5944709..1d0dd8fb 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -19,6 +19,7 @@ --validator-keys Path to validator keys directory --node-id Node identifier for validator assignment (default: lean_spec_0) --is-aggregator Enable aggregator mode for attestation aggregation (default: false) + --aggregate-subnet-ids Comma-separated extra subnet IDs to subscribe/aggregate (e.g. "0,1,2") --api-port Port for API server and Prometheus /metrics (default: 5052, 0 to disable) """ @@ -164,6 +165,7 @@ def _init_from_genesis( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node: """ @@ -174,6 +176,8 @@ def _init_from_genesis( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -197,6 +201,7 @@ def _init_from_genesis( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, + aggregate_subnet_ids=aggregate_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -210,6 +215,7 @@ async def _init_from_checkpoint( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node | None: """ @@ -238,6 +244,8 @@ async def _init_from_checkpoint( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -306,6 +314,7 @@ async def _init_from_checkpoint( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, + aggregate_subnet_ids=aggregate_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -399,6 +408,7 @@ async def run_node( node_id: str = "lean_spec_0", genesis_time_now: bool = False, is_aggregator: bool = False, + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = 5052, ) -> None: """ @@ -413,6 +423,8 @@ async def run_node( node_id: Node identifier for validator assignment. genesis_time_now: Override genesis time to current time for testing. is_aggregator: Enable aggregator mode for attestation aggregation. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server (health, fork_choice, /metrics). None or 0 disables. """ metrics.init(name="leanspec-node", version="0.0.1") @@ -443,6 +455,8 @@ async def run_node( # Log aggregator mode if enabled if is_aggregator: logger.info("Aggregator mode enabled - node will perform attestation aggregation") + if aggregate_subnet_ids: + logger.info("Aggregate subnet IDs configured: %s", list(aggregate_subnet_ids)) # Load validator keys if path provided. # @@ -494,18 +508,44 @@ async def run_node( # subscriptions to peers. block_topic = GossipTopic.block(GOSSIP_FORK_DIGEST).to_topic_id() event_source.subscribe_gossip_topic(block_topic) - # Subscribe to attestation subnet topics based on local validator id. - validator_id = validator_registry.primary_index() if validator_registry else None - if validator_id is None: - subnet_id = SubnetId(0) - logger.info("No local validator id; subscribing to attestation subnet %d", subnet_id) - else: - subnet_id = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - attestation_subnet_topic = GossipTopic.attestation_subnet( - GOSSIP_FORK_DIGEST, subnet_id - ).to_topic_id() - event_source.subscribe_gossip_topic(attestation_subnet_topic) - logger.info("Subscribed to gossip topics: %s, %s", block_topic, attestation_subnet_topic) + + # Determine attestation subnets to subscribe to. + # + # Subscribing to a subnet causes the p2p layer to receive all messages on it. + # Not subscribing saves bandwidth — messages arrive only if the node publishes + # to that subnet (via gossipsub fanout), not as a mesh member. + # + # Subscription rules: + # - All nodes with registered validators subscribe to their validator-derived subnets. + # This forms the mesh network for attestation propagation. + # - Aggregator nodes additionally subscribe to any explicit aggregate-subnet-ids. + # - Non-aggregator nodes with no validators skip attestation subscriptions entirely. + subscription_subnets: set[SubnetId] = set() + + # Always subscribe to validator-derived subnets regardless of aggregator flag. + # Loop over all registered validators so each one's subnet is covered. + if validator_registry is not None: + for validator_id in validator_registry.indices(): + subscription_subnets.add(validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)) + + # Explicit aggregate subnets are additive but only meaningful for aggregators. + if is_aggregator: + if not subscription_subnets: + # Aggregator with no registered validators — fall back to subnet 0. + subscription_subnets.add(SubnetId(0)) + subscription_subnets.update(aggregate_subnet_ids) + + for subnet_id in subscription_subnets: + attestation_subnet_topic = GossipTopic.attestation_subnet( + GOSSIP_FORK_DIGEST, subnet_id + ).to_topic_id() + event_source.subscribe_gossip_topic(attestation_subnet_topic) + logger.info("Subscribed to attestation subnet %d", subnet_id) + + if not subscription_subnets: + logger.info("Not subscribing to any attestation subnet") + + logger.info("Subscribed to block gossip topic: %s", block_topic) # Two initialization paths: checkpoint sync or genesis sync. # @@ -528,6 +568,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=api_port_int, ) if node is None: @@ -542,6 +583,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=api_port_int, ) @@ -678,6 +720,17 @@ def main() -> None: action="store_true", help="Enable aggregator mode (node performs attestation aggregation)", ) + parser.add_argument( + "--aggregate-subnet-ids", + type=str, + default=None, + metavar="SUBNETS", + help=( + "Comma-separated attestation subnet IDs to additionally subscribe and aggregate " + "(e.g. '0,1,2'). Requires --is-aggregator. " + "Adds to the validator-derived subnet." + ), + ) parser.add_argument( "--api-port", type=int, @@ -690,6 +743,24 @@ def main() -> None: setup_logging(args.verbose, args.no_color) + # Parse --aggregate-subnet-ids from comma-separated string to tuple of SubnetId. + # Reject the flag upfront if --is-aggregator is not set. + aggregate_subnet_ids: tuple[SubnetId, ...] = () + if args.aggregate_subnet_ids: + if not args.is_aggregator: + logger.error("--aggregate-subnet-ids requires --is-aggregator to be set") + sys.exit(1) + try: + aggregate_subnet_ids = tuple( + SubnetId(int(s.strip())) for s in args.aggregate_subnet_ids.split(",") if s.strip() + ) + except ValueError: + logger.error( + "Invalid --aggregate-subnet-ids value: %r (expected comma-separated integers)", + args.aggregate_subnet_ids, + ) + sys.exit(1) + # Use asyncio.run with proper task cancellation on interrupt. # This ensures all tasks are cancelled and resources are released. try: @@ -703,6 +774,7 @@ def main() -> None: node_id=args.node_id, genesis_time_now=args.genesis_time_now, is_aggregator=args.is_aggregator, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=args.api_port, ) ) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 3d21f348..f470b534 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -12,7 +12,6 @@ from lean_spec.subspecs.chain.clock import Interval from lean_spec.subspecs.chain.config import ( - ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT, JUSTIFICATION_LOOKBACK_SLOTS, ) @@ -330,17 +329,20 @@ def on_gossip_attestation( This method: 1. Verifies the XMSS signature - 2. If current node is aggregator, stores the signature in the gossip - signature map if it belongs to the current validator's subnet - 3. Processes the attestation data via on_attestation + 2. Stores the signature when the node is in aggregator mode + + Subnet filtering happens at the p2p subscription layer — only + attestations from subscribed subnets reach this method. No + additional subnet check is needed here. Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. is_aggregator: True if current validator holds aggregator role. + Only aggregator nodes store gossip attestation signatures. Returns: - New Store with attestation processed and signature stored. + New Store with attestation processed and signature stored if aggregating. Raises: ValueError: If validator not found in state. @@ -373,14 +375,14 @@ def on_gossip_attestation( # Copy the inner sets so we can add to them without mutating the previous store. new_committee_sigs = {k: set(v) for k, v in self.attestation_signatures.items()} + # Aggregators store all received gossip signatures. + # The p2p layer only delivers attestations from subscribed subnets, + # so subnet filtering happens at subscription time, not here. + # Non-aggregator nodes validate and drop — they never store gossip signatures. if is_aggregator: - assert self.validator_id is not None, "Current validator ID must be set for aggregation" - current_subnet = self.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - if current_subnet == attester_subnet: - new_committee_sigs.setdefault(attestation_data, set()).add( - AttestationSignatureEntry(validator_id, signature) - ) + new_committee_sigs.setdefault(attestation_data, set()).add( + AttestationSignatureEntry(validator_id, signature) + ) # Return store with updated signature map and attestation data return self.model_copy( diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 764b80c5..73947f59 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -80,6 +80,9 @@ class NetworkService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """Explicit attestation subnets to subscribe and aggregate from (requires is_aggregator).""" + _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 3342f788..af9bc0bd 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -34,7 +34,7 @@ from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.containers.state import Validators -from lean_spec.subspecs.containers.validator import ValidatorIndex +from lean_spec.subspecs.containers.validator import SubnetId, ValidatorIndex from lean_spec.subspecs.forkchoice import Store from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.networking import NetworkService @@ -125,6 +125,17 @@ class NodeConfig: - The node runs in standard validator or passive mode """ + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """ + Additional attestation subnets to subscribe to and aggregate from. + + When set, the node subscribes to these subnets at the p2p layer in + addition to validator-derived subnets. Effective only when is_aggregator + is True — only aggregators import gossip attestations into forkchoice. + + Additive to the validator-derived subnet. + """ + @dataclass(slots=True) class Node: @@ -249,6 +260,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: network=config.network, database=database, is_aggregator=config.is_aggregator, + aggregate_subnet_ids=config.aggregate_subnet_ids, genesis_start=True, ) @@ -258,6 +270,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: event_source=config.event_source, fork_digest=config.fork_digest, is_aggregator=config.is_aggregator, + aggregate_subnet_ids=config.aggregate_subnet_ids, ) # Wire up aggregated attestation publishing. diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 328b63fa..51373ade 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -51,6 +51,7 @@ SignedBlock, ) from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.containers.validator import SubnetId from lean_spec.subspecs.forkchoice.store import Store from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.networking.reqresp.message import Status @@ -159,6 +160,15 @@ class SyncService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """ + Explicit subnet IDs to subscribe to and aggregate from. + + When set, the node subscribes to these subnets at the p2p layer in + addition to its validator-derived subnet. Only active when is_aggregator + is also True — non-aggregator nodes never import gossip attestations. + """ + process_block: Callable[[Store, SignedBlock], Store] = field(default=default_block_processor) """Block processor function. Defaults to the store's block processing.""" diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index cd451fe1..152adb34 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -2,8 +2,6 @@ from __future__ import annotations -from unittest import mock - import pytest from consensus_testing.keys import XmssKeyManager @@ -145,26 +143,20 @@ def test_on_block_preserves_immutability_of_aggregated_payloads( ) -class TestOnGossipAttestationSubnetFiltering: +class TestOnGossipAttestationImportGating: """ - Unit tests for on_gossip_attestation with is_aggregator=True. + Unit tests for on_gossip_attestation import gating. - Tests subnet ID computation and cross-subnet filtering logic. - When is_aggregator=True, signatures should only be stored if the - attesting validator is in the same subnet as the current validator. + Subnet filtering happens at the p2p subscription layer — only attestations + from subscribed subnets are delivered to the store. The store's sole gate + is is_aggregator: aggregators store everything they receive, non-aggregators + drop everything. """ - def test_same_subnet_stores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Aggregator stores signature when attester is in same subnet. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 0 is in subnet 0 (0 % 4 = 0) - - Validator 4 is in subnet 0 (4 % 4 = 0) - - Current validator (0) should store signature from validator 4. - """ + def test_aggregator_stores_received_attestation(self, key_manager: XmssKeyManager) -> None: + """Aggregator stores any attestation that reaches the store.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(4) + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -176,71 +168,49 @@ def test_same_subnet_stores_signature(self, key_manager: XmssKeyManager) -> None signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - # Verify signature does NOT exist before calling the method assert attestation_data not in store.attestation_signatures, ( - "Precondition: signature should not exist before calling method" + "Precondition: no signatures before processing" ) - # Patch ATTESTATION_COMMITTEE_COUNT to 4 so we can test subnet filtering - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - # Verify signature NOW exists after calling the method sigs = updated_store.attestation_signatures.get(attestation_data, set()) assert attester_validator in {entry.validator_id for entry in sigs}, ( - "Signature from same-subnet validator should be stored" + "Aggregator should store any attestation it receives" ) - def test_cross_subnet_ignores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Aggregator ignores signature when attester is in different subnet. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 0 is in subnet 0 (0 % 4 = 0) - - Validator 1 is in subnet 1 (1 % 4 = 1) - - Current validator (0) should NOT store signature from validator 1. - """ + def test_aggregator_stores_multiple_attestations(self, key_manager: XmssKeyManager) -> None: + """Aggregator stores all attestations regardless of which validator sent them.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(1) + attesters = [ValidatorIndex(1), ValidatorIndex(2), ValidatorIndex(3)] store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator ) - signed_attestation = SignedAttestation( - validator_id=attester_validator, - data=attestation_data, - signature=key_manager.sign_attestation_data(attester_validator, attestation_data), - ) - - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, + def make_signed(v: ValidatorIndex) -> SignedAttestation: + return SignedAttestation( + validator_id=v, + data=attestation_data, + signature=key_manager.sign_attestation_data(v, attestation_data), ) - # Verify signature was NOT stored - sigs = updated_store.attestation_signatures.get(attestation_data, set()) - assert attester_validator not in {entry.validator_id for entry in sigs}, ( - "Signature from different-subnet validator should NOT be stored" + updated = store + for v in attesters: + updated = updated.on_gossip_attestation(make_signed(v), is_aggregator=True) + + stored_ids = { + entry.validator_id + for entry in updated.attestation_signatures.get(attestation_data, set()) + } + assert stored_ids == set(attesters), ( + "Aggregator should store attestations from all received validators" ) def test_non_aggregator_never_stores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Non-aggregator nodes never store gossip signatures. - - When is_aggregator=False, the signature storage path is skipped - regardless of subnet membership. - """ + """Non-aggregator nodes drop all gossip attestations regardless of sender.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(4) # Same subnet + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -252,29 +222,19 @@ def test_non_aggregator_never_stores_signature(self, key_manager: XmssKeyManager signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=False, # Not an aggregator - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=False) - # Verify signature was NOT stored even though same subnet sigs = updated_store.attestation_signatures.get(attestation_data, set()) assert attester_validator not in {entry.validator_id for entry in sigs}, ( "Non-aggregator should never store gossip signatures" ) - def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyManager) -> None: - """ - Cross-subnet attestation does not create a attestation_signatures entry. - - When the attester is in a different subnet, no entry is created - for that attestation data in attestation_signatures. - """ + def test_non_aggregator_does_not_create_signatures_entry( + self, key_manager: XmssKeyManager + ) -> None: + """Non-aggregator leaves attestation_signatures unchanged.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(1) # Different subnet + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -286,17 +246,10 @@ def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyMan signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=False) - # Verify no gossip entry was created for this attestation data assert attestation_data not in updated_store.attestation_signatures, ( - "Cross-subnet attestation should not create a attestation_signatures entry" + "Non-aggregator should not create any attestation_signatures entry" )