From 0494a5b1d5eafda983273da30c123e7377399ad3 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 15:11:38 +0000 Subject: [PATCH 1/7] node, networking, forkchoice: import-subnet-ids for subnet network topology Add `import_subnet_ids` to control which attestation subnets a node subscribes to and collects signatures from, independently of aggregator role. Subscription changes (p2p layer): - Aggregator nodes subscribe to their validator-derived subnet only. - Non-aggregator nodes with no import subnets skip attestation subscriptions entirely, saving bandwidth on subnets they have no use for. - import_subnet_ids are always subscribed regardless of aggregator flag, allowing proposer nodes to gather attestations from specific subnets for block inclusion without enabling full aggregation mode. - The two sets are additive and deduplicated. Signature collection changes (forkchoice layer): - Aggregator path (existing): collects from validators sharing its subnet. - Import-subnet path (new): always collects from explicitly listed subnets regardless of is_aggregator. Enables proposers to gather raw signatures. The --import-subnet-ids CLI flag accepts comma-separated subnet IDs (e.g. "0,1,2") and is passed through run_node, _init_from_genesis, and _init_from_checkpoint into NodeConfig, SyncService, and NetworkService. Tests: three new cases in TestOnGossipAttestationSubnetFiltering covering import-only storage, subnet boundary enforcement, and additive combination with aggregator mode. --- src/lean_spec/__main__.py | 88 ++++++++++-- src/lean_spec/subspecs/forkchoice/store.py | 33 +++-- .../subspecs/networking/service/service.py | 3 + src/lean_spec/subspecs/node/node.py | 20 ++- src/lean_spec/subspecs/sync/service.py | 13 ++ .../forkchoice/test_store_attestations.py | 130 +++++++++++++++++- 6 files changed, 265 insertions(+), 22 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index f5944709..ce77553b 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -19,6 +19,7 @@ --validator-keys Path to validator keys directory --node-id Node identifier for validator assignment (default: lean_spec_0) --is-aggregator Enable aggregator mode for attestation aggregation (default: false) + --import-subnet-ids Comma-separated subnet IDs to subscribe and import (e.g. "0,1,2") --api-port Port for API server and Prometheus /metrics (default: 5052, 0 to disable) """ @@ -164,6 +165,7 @@ def _init_from_genesis( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, + import_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node: """ @@ -174,6 +176,7 @@ def _init_from_genesis( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. + import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -197,6 +200,7 @@ def _init_from_genesis( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, + import_subnet_ids=import_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -210,6 +214,7 @@ async def _init_from_checkpoint( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, + import_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node | None: """ @@ -238,6 +243,7 @@ async def _init_from_checkpoint( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. + import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -306,6 +312,7 @@ async def _init_from_checkpoint( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, + import_subnet_ids=import_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -399,6 +406,7 @@ async def run_node( node_id: str = "lean_spec_0", genesis_time_now: bool = False, is_aggregator: bool = False, + import_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = 5052, ) -> None: """ @@ -413,6 +421,7 @@ async def run_node( node_id: Node identifier for validator assignment. genesis_time_now: Override genesis time to current time for testing. is_aggregator: Enable aggregator mode for attestation aggregation. + import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. api_port: Port for API server (health, fork_choice, /metrics). None or 0 disables. """ metrics.init(name="leanspec-node", version="0.0.1") @@ -443,6 +452,8 @@ async def run_node( # Log aggregator mode if enabled if is_aggregator: logger.info("Aggregator mode enabled - node will perform attestation aggregation") + if import_subnet_ids: + logger.info("Import subnet IDs configured: %s", list(import_subnet_ids)) # Load validator keys if path provided. # @@ -494,18 +505,43 @@ async def run_node( # subscriptions to peers. block_topic = GossipTopic.block(GOSSIP_FORK_DIGEST).to_topic_id() event_source.subscribe_gossip_topic(block_topic) - # Subscribe to attestation subnet topics based on local validator id. - validator_id = validator_registry.primary_index() if validator_registry else None - if validator_id is None: - subnet_id = SubnetId(0) - logger.info("No local validator id; subscribing to attestation subnet %d", subnet_id) - else: - subnet_id = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - attestation_subnet_topic = GossipTopic.attestation_subnet( - GOSSIP_FORK_DIGEST, subnet_id - ).to_topic_id() - event_source.subscribe_gossip_topic(attestation_subnet_topic) - logger.info("Subscribed to gossip topics: %s, %s", block_topic, attestation_subnet_topic) + + # Determine attestation subnets to subscribe to. + # + # Subscribing to a subnet causes the p2p layer to receive all messages on it. + # Not subscribing saves bandwidth — messages arrive only if the node publishes + # to that subnet (via gossipsub fanout), not as a mesh member. + # + # Subscription rules: + # - Aggregator nodes subscribe to their validator-derived subnet so they can + # collect raw signatures for aggregation. + # - Explicit import subnets are always subscribed regardless of aggregator role, + # allowing proposer nodes to gather attestations from specific subnets for + # block inclusion without enabling full aggregation. + # - Non-aggregator nodes with no import subnets skip attestation subscriptions + # entirely to avoid unnecessary bandwidth use. + subscription_subnets: set[SubnetId] = set() + + if is_aggregator: + validator_id = validator_registry.primary_index() if validator_registry else None + if validator_id is not None: + subscription_subnets.add(validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)) + else: + subscription_subnets.add(SubnetId(0)) + + subscription_subnets.update(import_subnet_ids) + + for subnet_id in sorted(subscription_subnets): + attestation_subnet_topic = GossipTopic.attestation_subnet( + GOSSIP_FORK_DIGEST, subnet_id + ).to_topic_id() + event_source.subscribe_gossip_topic(attestation_subnet_topic) + logger.info("Subscribed to attestation subnet %d", subnet_id) + + if not subscription_subnets: + logger.info("Not subscribing to any attestation subnet") + + logger.info("Subscribed to block gossip topic: %s", block_topic) # Two initialization paths: checkpoint sync or genesis sync. # @@ -528,6 +564,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, + import_subnet_ids=import_subnet_ids, api_port=api_port_int, ) if node is None: @@ -542,6 +579,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, + import_subnet_ids=import_subnet_ids, api_port=api_port_int, ) @@ -678,6 +716,17 @@ def main() -> None: action="store_true", help="Enable aggregator mode (node performs attestation aggregation)", ) + parser.add_argument( + "--import-subnet-ids", + type=str, + default=None, + metavar="SUBNETS", + help=( + "Comma-separated attestation subnet IDs to subscribe to and import " + "(e.g. '0,1,2'). Subscribed regardless of --is-aggregator. " + "Adds to the validator-derived subnet when --is-aggregator is also set." + ), + ) parser.add_argument( "--api-port", type=int, @@ -690,6 +739,20 @@ def main() -> None: setup_logging(args.verbose, args.no_color) + # Parse --import-subnet-ids from comma-separated string to tuple of SubnetId. + import_subnet_ids: tuple[SubnetId, ...] = () + if args.import_subnet_ids: + try: + import_subnet_ids = tuple( + SubnetId(int(s.strip())) for s in args.import_subnet_ids.split(",") if s.strip() + ) + except ValueError: + logger.error( + "Invalid --import-subnet-ids value: %r (expected comma-separated integers)", + args.import_subnet_ids, + ) + sys.exit(1) + # Use asyncio.run with proper task cancellation on interrupt. # This ensures all tasks are cancelled and resources are released. try: @@ -703,6 +766,7 @@ def main() -> None: node_id=args.node_id, genesis_time_now=args.genesis_time_now, is_aggregator=args.is_aggregator, + import_subnet_ids=import_subnet_ids, api_port=args.api_port, ) ) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 3d21f348..fbac0828 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -30,6 +30,7 @@ from lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation from lean_spec.subspecs.containers.block import BlockLookup from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.containers.validator import SubnetId from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import ( @@ -324,20 +325,24 @@ def on_gossip_attestation( signed_attestation: SignedAttestation, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, is_aggregator: bool = False, + import_subnet_ids: tuple[SubnetId, ...] = (), ) -> "Store": """ Process a signed attestation received via gossip network. This method: 1. Verifies the XMSS signature - 2. If current node is aggregator, stores the signature in the gossip - signature map if it belongs to the current validator's subnet - 3. Processes the attestation data via on_attestation + 2. Stores the signature for aggregation under two conditions: + - Aggregator nodes store from validators sharing the same subnet + - Signatures from explicitly configured import subnets are always stored Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. is_aggregator: True if current validator holds aggregator role. + import_subnet_ids: Subnets whose attestations are stored regardless + of aggregator role. Allows proposer nodes to collect signatures + from specific subnets without enabling full aggregation. Returns: New Store with attestation processed and signature stored. @@ -373,14 +378,26 @@ def on_gossip_attestation( # Copy the inner sets so we can add to them without mutating the previous store. new_committee_sigs = {k: set(v) for k, v in self.attestation_signatures.items()} + attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) + + # Two conditions for storing a gossip signature: + # + # 1. Aggregator path: aggregator collects from its own subnet. + # 2. Import-subnet path: explicit subnets are collected regardless of aggregator role. + # This lets proposer nodes gather signatures from configured subnets to include + # in blocks without enabling full aggregation. + should_store = False if is_aggregator: assert self.validator_id is not None, "Current validator ID must be set for aggregation" current_subnet = self.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - if current_subnet == attester_subnet: - new_committee_sigs.setdefault(attestation_data, set()).add( - AttestationSignatureEntry(validator_id, signature) - ) + should_store = current_subnet == attester_subnet + if not should_store and import_subnet_ids: + should_store = attester_subnet in import_subnet_ids + + if should_store: + new_committee_sigs.setdefault(attestation_data, set()).add( + AttestationSignatureEntry(validator_id, signature) + ) # Return store with updated signature map and attestation data return self.model_copy( diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 764b80c5..5ba45b7b 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -80,6 +80,9 @@ class NetworkService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" + import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """Attestation subnets subscribed to and imported regardless of aggregator role.""" + _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 3342f788..745987b9 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -34,7 +34,7 @@ from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.containers.state import Validators -from lean_spec.subspecs.containers.validator import ValidatorIndex +from lean_spec.subspecs.containers.validator import SubnetId, ValidatorIndex from lean_spec.subspecs.forkchoice import Store from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.networking import NetworkService @@ -125,6 +125,22 @@ class NodeConfig: - The node runs in standard validator or passive mode """ + import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """ + Additional attestation subnets to subscribe to and import from. + + Subscriptions to these subnets are established at the network layer, + conserving bandwidth on subnets this node has no interest in. + + Attestations arriving on these subnets are always collected into the + signature pool regardless of the aggregator flag. This allows proposer + nodes to gather attestations from specific subnets for block inclusion + without enabling full aggregation mode. + + These subnets are additive to the validator-derived subnet when + the node is also an aggregator. + """ + @dataclass(slots=True) class Node: @@ -249,6 +265,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: network=config.network, database=database, is_aggregator=config.is_aggregator, + import_subnet_ids=config.import_subnet_ids, genesis_start=True, ) @@ -258,6 +275,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: event_source=config.event_source, fork_digest=config.fork_digest, is_aggregator=config.is_aggregator, + import_subnet_ids=config.import_subnet_ids, ) # Wire up aggregated attestation publishing. diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 328b63fa..039fe6b1 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -51,6 +51,7 @@ SignedBlock, ) from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.containers.validator import SubnetId from lean_spec.subspecs.forkchoice.store import Store from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.networking.reqresp.message import Status @@ -159,6 +160,16 @@ class SyncService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" + import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """ + Subnets whose attestations are stored regardless of aggregator role. + + Subscriptions to these subnets are established at the network layer. + Attestations arriving on these subnets are always collected into the + signature pool, allowing proposer nodes to gather material from specific + subnets without enabling full aggregation mode. + """ + process_block: Callable[[Store, SignedBlock], Store] = field(default=default_block_processor) """Block processor function. Defaults to the store's block processing.""" @@ -514,6 +525,7 @@ async def on_gossip_attestation( self.store = self.store.on_gossip_attestation( signed_attestation=attestation, is_aggregator=is_aggregator_role, + import_subnet_ids=self.import_subnet_ids, ) metrics.lean_attestation_validation_time_seconds.observe(time.perf_counter() - t0) metrics.lean_attestations_valid_total.labels(source="gossip").inc() @@ -606,6 +618,7 @@ def _replay_pending_attestations(self) -> None: self.store = self.store.on_gossip_attestation( signed_attestation=attestation, is_aggregator=is_aggregator_role, + import_subnet_ids=self.import_subnet_ids, ) except (AssertionError, KeyError): self._pending_attestations.append(attestation) diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index cd451fe1..fc3a4773 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -16,7 +16,7 @@ ) from lean_spec.subspecs.containers.checkpoint import Checkpoint from lean_spec.subspecs.containers.slot import Slot -from lean_spec.subspecs.containers.validator import ValidatorIndex, ValidatorIndices +from lean_spec.subspecs.containers.validator import SubnetId, ValidatorIndex, ValidatorIndices from lean_spec.subspecs.forkchoice import AttestationSignatureEntry from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof from lean_spec.types import Bytes32, Uint64 @@ -299,6 +299,134 @@ def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyMan "Cross-subnet attestation should not create a attestation_signatures entry" ) + def test_import_subnet_stores_without_aggregator(self, key_manager: XmssKeyManager) -> None: + """ + import_subnet_ids allows collecting signatures without aggregator role. + + When a subnet is in import_subnet_ids, its attestations are stored + even when is_aggregator=False. This supports proposer nodes that need + attestation material from specific subnets for block inclusion. + + With ATTESTATION_COMMITTEE_COUNT=4: + - Validator 1 is in subnet 1 (1 % 4 = 1) + - Non-aggregator with import_subnet_ids=(SubnetId(1),) stores it. + """ + attester_validator = ValidatorIndex(1) # Subnet 1 + + store, attestation_data = make_store_with_attestation_data( + key_manager, num_validators=8, validator_id=ValidatorIndex(0) + ) + + signed_attestation = SignedAttestation( + validator_id=attester_validator, + data=attestation_data, + signature=key_manager.sign_attestation_data(attester_validator, attestation_data), + ) + + with mock.patch( + "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) + ): + updated_store = store.on_gossip_attestation( + signed_attestation, + is_aggregator=False, + import_subnet_ids=(SubnetId(1),), + ) + + sigs = updated_store.attestation_signatures.get(attestation_data, set()) + assert attester_validator in {entry.validator_id for entry in sigs}, ( + "import_subnet_ids should store signatures regardless of aggregator role" + ) + + def test_import_subnet_ignored_for_other_subnets(self, key_manager: XmssKeyManager) -> None: + """ + import_subnet_ids only covers the explicitly listed subnets. + + Attestations from subnets not in import_subnet_ids are not stored + when is_aggregator=False, even if import_subnet_ids is non-empty. + + With ATTESTATION_COMMITTEE_COUNT=4: + - Validator 2 is in subnet 2 (2 % 4 = 2) + - import_subnet_ids=(SubnetId(1),) does not cover subnet 2. + """ + attester_validator = ValidatorIndex(2) # Subnet 2 + + store, attestation_data = make_store_with_attestation_data( + key_manager, num_validators=8, validator_id=ValidatorIndex(0) + ) + + signed_attestation = SignedAttestation( + validator_id=attester_validator, + data=attestation_data, + signature=key_manager.sign_attestation_data(attester_validator, attestation_data), + ) + + with mock.patch( + "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) + ): + updated_store = store.on_gossip_attestation( + signed_attestation, + is_aggregator=False, + import_subnet_ids=(SubnetId(1),), + ) + + sigs = updated_store.attestation_signatures.get(attestation_data, set()) + assert attester_validator not in {entry.validator_id for entry in sigs}, ( + "Subnets not in import_subnet_ids should not be stored for non-aggregators" + ) + + def test_import_subnet_combined_with_aggregator(self, key_manager: XmssKeyManager) -> None: + """ + import_subnet_ids and aggregator mode are additive. + + When both flags are active, signatures are stored from both the + validator-derived aggregator subnet and the explicitly listed subnets. + + With ATTESTATION_COMMITTEE_COUNT=4 and current validator 0 (subnet 0): + - Validator 4 is in subnet 0 — stored via aggregator path. + - Validator 1 is in subnet 1 — stored via import_subnet_ids path. + - Validator 2 is in subnet 2 — stored by neither; should be absent. + """ + current_validator = ValidatorIndex(0) # Subnet 0 + same_subnet_validator = ValidatorIndex(4) # Subnet 0 + import_subnet_validator = ValidatorIndex(1) # Subnet 1 + other_subnet_validator = ValidatorIndex(2) # Subnet 2 + + store, attestation_data = make_store_with_attestation_data( + key_manager, num_validators=8, validator_id=current_validator + ) + + def make_signed(v: ValidatorIndex) -> SignedAttestation: + return SignedAttestation( + validator_id=v, + data=attestation_data, + signature=key_manager.sign_attestation_data(v, attestation_data), + ) + + with mock.patch( + "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) + ): + updated = store + for v in (same_subnet_validator, import_subnet_validator, other_subnet_validator): + updated = updated.on_gossip_attestation( + make_signed(v), + is_aggregator=True, + import_subnet_ids=(SubnetId(1),), + ) + + stored_ids = { + entry.validator_id + for entry in updated.attestation_signatures.get(attestation_data, set()) + } + assert same_subnet_validator in stored_ids, ( + "Same-subnet validator stored via aggregator path" + ) + assert import_subnet_validator in stored_ids, ( + "Import-subnet validator stored via import path" + ) + assert other_subnet_validator not in stored_ids, ( + "Other-subnet validator should not be stored" + ) + class TestOnGossipAggregatedAttestation: """ From ff2be1fcc504488736e117eebf21f2b0e20e3ed8 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 15:18:22 +0000 Subject: [PATCH 2/7] tests: add import_subnet_ids to mocks and inline test stubs MockForkchoiceStore.on_gossip_attestation and the inline reject_unknown stub in the sync service tests were missing the import_subnet_ids keyword argument added to the real Store.on_gossip_attestation, causing TypeError in CI. --- tests/lean_spec/helpers/mocks.py | 1 + tests/lean_spec/subspecs/sync/test_service.py | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/lean_spec/helpers/mocks.py b/tests/lean_spec/helpers/mocks.py index 15450852..357f7945 100644 --- a/tests/lean_spec/helpers/mocks.py +++ b/tests/lean_spec/helpers/mocks.py @@ -130,6 +130,7 @@ def on_gossip_attestation( signed_attestation: SignedAttestation, *, is_aggregator: bool = False, + import_subnet_ids: tuple = (), ) -> MockForkchoiceStore: """Track attestation additions. Returns self for assignment chaining.""" self._attestations_received.append(signed_attestation) diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index 807ed523..d26daec3 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -416,10 +416,14 @@ async def test_attestation_buffered_when_block_unknown( # Override on_gossip_attestation to raise for unknown blocks. original_fn = sync_service.store.on_gossip_attestation - def reject_unknown(signed_attestation, *, is_aggregator=False): + def reject_unknown(signed_attestation, *, is_aggregator=False, import_subnet_ids=()): if signed_attestation.data.target.root == unknown_root: raise KeyError("Unknown block") - return original_fn(signed_attestation, is_aggregator=is_aggregator) + return original_fn( + signed_attestation, + is_aggregator=is_aggregator, + import_subnet_ids=import_subnet_ids, + ) sync_service.store.on_gossip_attestation = reject_unknown # type: ignore[assignment] From ff084d9d1cf7d7ec027fe26b63205ea01638dc1a Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 17:20:20 +0000 Subject: [PATCH 3/7] node, networking, forkchoice: align subnet topology with zeam final code Three behavioral changes to match the final merged zeam PR #685: 1. Subscription gating: - All nodes with validators now subscribe to their validator-derived subnets regardless of is_aggregator flag (forms mesh for propagation). - aggregate_subnet_ids subscription is gated on is_aggregator (only aggregators need explicit extra subnets for signature collection). - Non-aggregator nodes with no validators skip attestation subscriptions. 2. Import gating (store + sync): - Gossip attestation import is gated solely on is_aggregator. - Remove the independent import_subnet_ids path in on_gossip_attestation that stored signatures without aggregator role. No such path exists in the zeam final code. - on_gossip_attestation no longer accepts import_subnet_ids kwarg. 3. Rename import_subnet_ids -> aggregate_subnet_ids throughout: - CLI flag: --import-subnet-ids -> --aggregate-subnet-ids - NodeConfig, SyncService, NetworkService fields renamed. - Tests updated: three import_subnet_ids tests replaced with two tests that verify the correct aggregator-only import semantics. --- src/lean_spec/__main__.py | 78 ++++++++-------- src/lean_spec/subspecs/forkchoice/store.py | 23 ++--- .../subspecs/networking/service/service.py | 4 +- src/lean_spec/subspecs/node/node.py | 21 ++--- src/lean_spec/subspecs/sync/service.py | 13 +-- tests/lean_spec/helpers/mocks.py | 1 - .../forkchoice/test_store_attestations.py | 91 +++++-------------- tests/lean_spec/subspecs/sync/test_service.py | 8 +- 8 files changed, 88 insertions(+), 151 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index ce77553b..d86d3b95 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -19,7 +19,7 @@ --validator-keys Path to validator keys directory --node-id Node identifier for validator assignment (default: lean_spec_0) --is-aggregator Enable aggregator mode for attestation aggregation (default: false) - --import-subnet-ids Comma-separated subnet IDs to subscribe and import (e.g. "0,1,2") + --aggregate-subnet-ids Comma-separated extra subnet IDs to subscribe/aggregate (e.g. "0,1,2") --api-port Port for API server and Prometheus /metrics (default: 5052, 0 to disable) """ @@ -165,7 +165,7 @@ def _init_from_genesis( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, - import_subnet_ids: tuple[SubnetId, ...] = (), + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node: """ @@ -176,7 +176,8 @@ def _init_from_genesis( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. - import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -200,7 +201,7 @@ def _init_from_genesis( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, - import_subnet_ids=import_subnet_ids, + aggregate_subnet_ids=aggregate_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -214,7 +215,7 @@ async def _init_from_checkpoint( event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, is_aggregator: bool = False, - import_subnet_ids: tuple[SubnetId, ...] = (), + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = None, ) -> Node | None: """ @@ -243,7 +244,8 @@ async def _init_from_checkpoint( event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. is_aggregator: Enable aggregator mode for attestation aggregation. - import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server and /metrics. None disables the API. Returns: @@ -312,7 +314,7 @@ async def _init_from_checkpoint( validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, is_aggregator=is_aggregator, - import_subnet_ids=import_subnet_ids, + aggregate_subnet_ids=aggregate_subnet_ids, api_config=ApiServerConfig(port=api_port) if api_port is not None else None, ) @@ -406,7 +408,7 @@ async def run_node( node_id: str = "lean_spec_0", genesis_time_now: bool = False, is_aggregator: bool = False, - import_subnet_ids: tuple[SubnetId, ...] = (), + aggregate_subnet_ids: tuple[SubnetId, ...] = (), api_port: int | None = 5052, ) -> None: """ @@ -421,7 +423,8 @@ async def run_node( node_id: Node identifier for validator assignment. genesis_time_now: Override genesis time to current time for testing. is_aggregator: Enable aggregator mode for attestation aggregation. - import_subnet_ids: Subnets to subscribe to and import regardless of aggregator role. + aggregate_subnet_ids: Additional subnets to subscribe and aggregate from. + Only effective when is_aggregator is also True. api_port: Port for API server (health, fork_choice, /metrics). None or 0 disables. """ metrics.init(name="leanspec-node", version="0.0.1") @@ -452,8 +455,8 @@ async def run_node( # Log aggregator mode if enabled if is_aggregator: logger.info("Aggregator mode enabled - node will perform attestation aggregation") - if import_subnet_ids: - logger.info("Import subnet IDs configured: %s", list(import_subnet_ids)) + if aggregate_subnet_ids: + logger.info("Aggregate subnet IDs configured: %s", list(aggregate_subnet_ids)) # Load validator keys if path provided. # @@ -513,23 +516,24 @@ async def run_node( # to that subnet (via gossipsub fanout), not as a mesh member. # # Subscription rules: - # - Aggregator nodes subscribe to their validator-derived subnet so they can - # collect raw signatures for aggregation. - # - Explicit import subnets are always subscribed regardless of aggregator role, - # allowing proposer nodes to gather attestations from specific subnets for - # block inclusion without enabling full aggregation. - # - Non-aggregator nodes with no import subnets skip attestation subscriptions - # entirely to avoid unnecessary bandwidth use. + # - All nodes with registered validators subscribe to their validator-derived subnets. + # This forms the mesh network for attestation propagation. + # - Aggregator nodes additionally subscribe to any explicit aggregate-subnet-ids. + # - Non-aggregator nodes with no validators skip attestation subscriptions entirely. subscription_subnets: set[SubnetId] = set() - if is_aggregator: - validator_id = validator_registry.primary_index() if validator_registry else None + # Always subscribe to validator-derived subnets regardless of aggregator flag. + if validator_registry is not None: + validator_id = validator_registry.primary_index() if validator_id is not None: subscription_subnets.add(validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)) - else: - subscription_subnets.add(SubnetId(0)) - subscription_subnets.update(import_subnet_ids) + # Explicit aggregate subnets are additive but only meaningful for aggregators. + if is_aggregator: + if not subscription_subnets: + # Aggregator with no registered validators — fall back to subnet 0. + subscription_subnets.add(SubnetId(0)) + subscription_subnets.update(aggregate_subnet_ids) for subnet_id in sorted(subscription_subnets): attestation_subnet_topic = GossipTopic.attestation_subnet( @@ -564,7 +568,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, - import_subnet_ids=import_subnet_ids, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=api_port_int, ) if node is None: @@ -579,7 +583,7 @@ async def run_node( event_source=event_source, validator_registry=validator_registry, is_aggregator=is_aggregator, - import_subnet_ids=import_subnet_ids, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=api_port_int, ) @@ -717,14 +721,14 @@ def main() -> None: help="Enable aggregator mode (node performs attestation aggregation)", ) parser.add_argument( - "--import-subnet-ids", + "--aggregate-subnet-ids", type=str, default=None, metavar="SUBNETS", help=( - "Comma-separated attestation subnet IDs to subscribe to and import " - "(e.g. '0,1,2'). Subscribed regardless of --is-aggregator. " - "Adds to the validator-derived subnet when --is-aggregator is also set." + "Comma-separated attestation subnet IDs to additionally subscribe and aggregate " + "(e.g. '0,1,2'). Requires --is-aggregator. " + "Adds to the validator-derived subnet." ), ) parser.add_argument( @@ -739,17 +743,17 @@ def main() -> None: setup_logging(args.verbose, args.no_color) - # Parse --import-subnet-ids from comma-separated string to tuple of SubnetId. - import_subnet_ids: tuple[SubnetId, ...] = () - if args.import_subnet_ids: + # Parse --aggregate-subnet-ids from comma-separated string to tuple of SubnetId. + aggregate_subnet_ids: tuple[SubnetId, ...] = () + if args.aggregate_subnet_ids: try: - import_subnet_ids = tuple( - SubnetId(int(s.strip())) for s in args.import_subnet_ids.split(",") if s.strip() + aggregate_subnet_ids = tuple( + SubnetId(int(s.strip())) for s in args.aggregate_subnet_ids.split(",") if s.strip() ) except ValueError: logger.error( - "Invalid --import-subnet-ids value: %r (expected comma-separated integers)", - args.import_subnet_ids, + "Invalid --aggregate-subnet-ids value: %r (expected comma-separated integers)", + args.aggregate_subnet_ids, ) sys.exit(1) @@ -766,7 +770,7 @@ def main() -> None: node_id=args.node_id, genesis_time_now=args.genesis_time_now, is_aggregator=args.is_aggregator, - import_subnet_ids=import_subnet_ids, + aggregate_subnet_ids=aggregate_subnet_ids, api_port=args.api_port, ) ) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index fbac0828..8c570aed 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -30,7 +30,6 @@ from lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation from lean_spec.subspecs.containers.block import BlockLookup from lean_spec.subspecs.containers.slot import Slot -from lean_spec.subspecs.containers.validator import SubnetId from lean_spec.subspecs.metrics import registry as metrics from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import ( @@ -325,27 +324,23 @@ def on_gossip_attestation( signed_attestation: SignedAttestation, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, is_aggregator: bool = False, - import_subnet_ids: tuple[SubnetId, ...] = (), ) -> "Store": """ Process a signed attestation received via gossip network. This method: 1. Verifies the XMSS signature - 2. Stores the signature for aggregation under two conditions: - - Aggregator nodes store from validators sharing the same subnet - - Signatures from explicitly configured import subnets are always stored + 2. Stores the signature when the node is in aggregator mode and the + attester is on the same subnet as the current validator Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. is_aggregator: True if current validator holds aggregator role. - import_subnet_ids: Subnets whose attestations are stored regardless - of aggregator role. Allows proposer nodes to collect signatures - from specific subnets without enabling full aggregation. + Only aggregator nodes collect gossip attestation signatures. Returns: - New Store with attestation processed and signature stored. + New Store with attestation processed and signature stored if aggregating. Raises: ValueError: If validator not found in state. @@ -380,19 +375,13 @@ def on_gossip_attestation( attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - # Two conditions for storing a gossip signature: - # - # 1. Aggregator path: aggregator collects from its own subnet. - # 2. Import-subnet path: explicit subnets are collected regardless of aggregator role. - # This lets proposer nodes gather signatures from configured subnets to include - # in blocks without enabling full aggregation. + # Aggregators collect signatures from validators sharing the same subnet. + # Non-aggregator nodes validate and drop — they never store gossip signatures. should_store = False if is_aggregator: assert self.validator_id is not None, "Current validator ID must be set for aggregation" current_subnet = self.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) should_store = current_subnet == attester_subnet - if not should_store and import_subnet_ids: - should_store = attester_subnet in import_subnet_ids if should_store: new_committee_sigs.setdefault(attestation_data, set()).add( diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 5ba45b7b..73947f59 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -80,8 +80,8 @@ class NetworkService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" - import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) - """Attestation subnets subscribed to and imported regardless of aggregator role.""" + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + """Explicit attestation subnets to subscribe and aggregate from (requires is_aggregator).""" _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 745987b9..af9bc0bd 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -125,20 +125,15 @@ class NodeConfig: - The node runs in standard validator or passive mode """ - import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) """ - Additional attestation subnets to subscribe to and import from. + Additional attestation subnets to subscribe to and aggregate from. - Subscriptions to these subnets are established at the network layer, - conserving bandwidth on subnets this node has no interest in. + When set, the node subscribes to these subnets at the p2p layer in + addition to validator-derived subnets. Effective only when is_aggregator + is True — only aggregators import gossip attestations into forkchoice. - Attestations arriving on these subnets are always collected into the - signature pool regardless of the aggregator flag. This allows proposer - nodes to gather attestations from specific subnets for block inclusion - without enabling full aggregation mode. - - These subnets are additive to the validator-derived subnet when - the node is also an aggregator. + Additive to the validator-derived subnet. """ @@ -265,7 +260,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: network=config.network, database=database, is_aggregator=config.is_aggregator, - import_subnet_ids=config.import_subnet_ids, + aggregate_subnet_ids=config.aggregate_subnet_ids, genesis_start=True, ) @@ -275,7 +270,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: event_source=config.event_source, fork_digest=config.fork_digest, is_aggregator=config.is_aggregator, - import_subnet_ids=config.import_subnet_ids, + aggregate_subnet_ids=config.aggregate_subnet_ids, ) # Wire up aggregated attestation publishing. diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 039fe6b1..51373ade 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -160,14 +160,13 @@ class SyncService: is_aggregator: bool = field(default=False) """Whether this node functions as an aggregator.""" - import_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) + aggregate_subnet_ids: tuple[SubnetId, ...] = field(default_factory=tuple) """ - Subnets whose attestations are stored regardless of aggregator role. + Explicit subnet IDs to subscribe to and aggregate from. - Subscriptions to these subnets are established at the network layer. - Attestations arriving on these subnets are always collected into the - signature pool, allowing proposer nodes to gather material from specific - subnets without enabling full aggregation mode. + When set, the node subscribes to these subnets at the p2p layer in + addition to its validator-derived subnet. Only active when is_aggregator + is also True — non-aggregator nodes never import gossip attestations. """ process_block: Callable[[Store, SignedBlock], Store] = field(default=default_block_processor) @@ -525,7 +524,6 @@ async def on_gossip_attestation( self.store = self.store.on_gossip_attestation( signed_attestation=attestation, is_aggregator=is_aggregator_role, - import_subnet_ids=self.import_subnet_ids, ) metrics.lean_attestation_validation_time_seconds.observe(time.perf_counter() - t0) metrics.lean_attestations_valid_total.labels(source="gossip").inc() @@ -618,7 +616,6 @@ def _replay_pending_attestations(self) -> None: self.store = self.store.on_gossip_attestation( signed_attestation=attestation, is_aggregator=is_aggregator_role, - import_subnet_ids=self.import_subnet_ids, ) except (AssertionError, KeyError): self._pending_attestations.append(attestation) diff --git a/tests/lean_spec/helpers/mocks.py b/tests/lean_spec/helpers/mocks.py index 357f7945..15450852 100644 --- a/tests/lean_spec/helpers/mocks.py +++ b/tests/lean_spec/helpers/mocks.py @@ -130,7 +130,6 @@ def on_gossip_attestation( signed_attestation: SignedAttestation, *, is_aggregator: bool = False, - import_subnet_ids: tuple = (), ) -> MockForkchoiceStore: """Track attestation additions. Returns self for assignment chaining.""" self._attestations_received.append(signed_attestation) diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index fc3a4773..2c0b9779 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -16,7 +16,7 @@ ) from lean_spec.subspecs.containers.checkpoint import Checkpoint from lean_spec.subspecs.containers.slot import Slot -from lean_spec.subspecs.containers.validator import SubnetId, ValidatorIndex, ValidatorIndices +from lean_spec.subspecs.containers.validator import ValidatorIndex, ValidatorIndices from lean_spec.subspecs.forkchoice import AttestationSignatureEntry from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof from lean_spec.types import Bytes32, Uint64 @@ -299,17 +299,16 @@ def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyMan "Cross-subnet attestation should not create a attestation_signatures entry" ) - def test_import_subnet_stores_without_aggregator(self, key_manager: XmssKeyManager) -> None: + def test_non_aggregator_does_not_store_any_subnet(self, key_manager: XmssKeyManager) -> None: """ - import_subnet_ids allows collecting signatures without aggregator role. + Non-aggregator nodes never store signatures regardless of subnet. - When a subnet is in import_subnet_ids, its attestations are stored - even when is_aggregator=False. This supports proposer nodes that need - attestation material from specific subnets for block inclusion. + Import is gated solely on is_aggregator. A non-aggregator node + validates the attestation but drops the signature unconditionally. With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 1 is in subnet 1 (1 % 4 = 1) - - Non-aggregator with import_subnet_ids=(SubnetId(1),) stores it. + - Validator 1 is in subnet 1. + - Non-aggregator with is_aggregator=False should not store it. """ attester_validator = ValidatorIndex(1) # Subnet 1 @@ -329,67 +328,29 @@ def test_import_subnet_stores_without_aggregator(self, key_manager: XmssKeyManag updated_store = store.on_gossip_attestation( signed_attestation, is_aggregator=False, - import_subnet_ids=(SubnetId(1),), - ) - - sigs = updated_store.attestation_signatures.get(attestation_data, set()) - assert attester_validator in {entry.validator_id for entry in sigs}, ( - "import_subnet_ids should store signatures regardless of aggregator role" - ) - - def test_import_subnet_ignored_for_other_subnets(self, key_manager: XmssKeyManager) -> None: - """ - import_subnet_ids only covers the explicitly listed subnets. - - Attestations from subnets not in import_subnet_ids are not stored - when is_aggregator=False, even if import_subnet_ids is non-empty. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 2 is in subnet 2 (2 % 4 = 2) - - import_subnet_ids=(SubnetId(1),) does not cover subnet 2. - """ - attester_validator = ValidatorIndex(2) # Subnet 2 - - store, attestation_data = make_store_with_attestation_data( - key_manager, num_validators=8, validator_id=ValidatorIndex(0) - ) - - signed_attestation = SignedAttestation( - validator_id=attester_validator, - data=attestation_data, - signature=key_manager.sign_attestation_data(attester_validator, attestation_data), - ) - - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=False, - import_subnet_ids=(SubnetId(1),), ) sigs = updated_store.attestation_signatures.get(attestation_data, set()) assert attester_validator not in {entry.validator_id for entry in sigs}, ( - "Subnets not in import_subnet_ids should not be stored for non-aggregators" + "Non-aggregator should never store gossip signatures" ) - def test_import_subnet_combined_with_aggregator(self, key_manager: XmssKeyManager) -> None: + def test_aggregator_stores_own_subnet_only(self, key_manager: XmssKeyManager) -> None: """ - import_subnet_ids and aggregator mode are additive. + Aggregator stores signatures from its own subnet, not from other subnets. - When both flags are active, signatures are stored from both the - validator-derived aggregator subnet and the explicitly listed subnets. + The import gate is: is_aggregator AND attester is on same subnet. + No other path exists for storage. With ATTESTATION_COMMITTEE_COUNT=4 and current validator 0 (subnet 0): - - Validator 4 is in subnet 0 — stored via aggregator path. - - Validator 1 is in subnet 1 — stored via import_subnet_ids path. - - Validator 2 is in subnet 2 — stored by neither; should be absent. + - Validator 4 (subnet 0) is stored. + - Validator 1 (subnet 1) is not stored. + - Validator 2 (subnet 2) is not stored. """ current_validator = ValidatorIndex(0) # Subnet 0 same_subnet_validator = ValidatorIndex(4) # Subnet 0 - import_subnet_validator = ValidatorIndex(1) # Subnet 1 - other_subnet_validator = ValidatorIndex(2) # Subnet 2 + other_subnet_validator_1 = ValidatorIndex(1) # Subnet 1 + other_subnet_validator_2 = ValidatorIndex(2) # Subnet 2 store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -406,25 +367,21 @@ def make_signed(v: ValidatorIndex) -> SignedAttestation: "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) ): updated = store - for v in (same_subnet_validator, import_subnet_validator, other_subnet_validator): - updated = updated.on_gossip_attestation( - make_signed(v), - is_aggregator=True, - import_subnet_ids=(SubnetId(1),), - ) + for v in (same_subnet_validator, other_subnet_validator_1, other_subnet_validator_2): + updated = updated.on_gossip_attestation(make_signed(v), is_aggregator=True) stored_ids = { entry.validator_id for entry in updated.attestation_signatures.get(attestation_data, set()) } assert same_subnet_validator in stored_ids, ( - "Same-subnet validator stored via aggregator path" + "Same-subnet validator should be stored by aggregator" ) - assert import_subnet_validator in stored_ids, ( - "Import-subnet validator stored via import path" + assert other_subnet_validator_1 not in stored_ids, ( + "Different-subnet validator should not be stored" ) - assert other_subnet_validator not in stored_ids, ( - "Other-subnet validator should not be stored" + assert other_subnet_validator_2 not in stored_ids, ( + "Different-subnet validator should not be stored" ) diff --git a/tests/lean_spec/subspecs/sync/test_service.py b/tests/lean_spec/subspecs/sync/test_service.py index d26daec3..807ed523 100644 --- a/tests/lean_spec/subspecs/sync/test_service.py +++ b/tests/lean_spec/subspecs/sync/test_service.py @@ -416,14 +416,10 @@ async def test_attestation_buffered_when_block_unknown( # Override on_gossip_attestation to raise for unknown blocks. original_fn = sync_service.store.on_gossip_attestation - def reject_unknown(signed_attestation, *, is_aggregator=False, import_subnet_ids=()): + def reject_unknown(signed_attestation, *, is_aggregator=False): if signed_attestation.data.target.root == unknown_root: raise KeyError("Unknown block") - return original_fn( - signed_attestation, - is_aggregator=is_aggregator, - import_subnet_ids=import_subnet_ids, - ) + return original_fn(signed_attestation, is_aggregator=is_aggregator) sync_service.store.on_gossip_attestation = reject_unknown # type: ignore[assignment] From ddf58604530379d5698940a5117777900bd7b0bc Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 17:48:21 +0000 Subject: [PATCH 4/7] node: subscribe to subnets for all registered validators, not just primary --- src/lean_spec/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index d86d3b95..c223ae42 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -523,9 +523,9 @@ async def run_node( subscription_subnets: set[SubnetId] = set() # Always subscribe to validator-derived subnets regardless of aggregator flag. + # Loop over all registered validators so each one's subnet is covered. if validator_registry is not None: - validator_id = validator_registry.primary_index() - if validator_id is not None: + for validator_id in validator_registry.indices(): subscription_subnets.add(validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)) # Explicit aggregate subnets are additive but only meaningful for aggregators. From 0915a499578138acedd61018bafb10b965d1e554 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 17:50:47 +0000 Subject: [PATCH 5/7] node: remove unnecessary sort on subscription_subnets --- src/lean_spec/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index c223ae42..82c578c0 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -535,7 +535,7 @@ async def run_node( subscription_subnets.add(SubnetId(0)) subscription_subnets.update(aggregate_subnet_ids) - for subnet_id in sorted(subscription_subnets): + for subnet_id in subscription_subnets: attestation_subnet_topic = GossipTopic.attestation_subnet( GOSSIP_FORK_DIGEST, subnet_id ).to_topic_id() From 735fe3c736765c94eefc6e0b518f8cfdba5d2cce Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 18:24:18 +0000 Subject: [PATCH 6/7] forkchoice: remove per-subnet check from store, error on aggregate-subnet-ids without aggregator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subnet filtering now happens at the p2p subscription layer. Anything that reaches on_gossip_attestation on an aggregator can be stored directly — no need to compare attester subnet against current validator. Also error out early in __main__ if --aggregate-subnet-ids is set without --is-aggregator, since that combination is meaningless. Update tests to reflect the new semantics: aggregators store all received attestations, non-aggregators drop all. --- src/lean_spec/__main__.py | 3 + src/lean_spec/subspecs/forkchoice/store.py | 22 +- .../forkchoice/test_store_attestations.py | 212 ++++-------------- 3 files changed, 52 insertions(+), 185 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 82c578c0..7bc121a7 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -456,6 +456,9 @@ async def run_node( if is_aggregator: logger.info("Aggregator mode enabled - node will perform attestation aggregation") if aggregate_subnet_ids: + if not is_aggregator: + logger.error("--aggregate-subnet-ids requires --is-aggregator to be set") + sys.exit(1) logger.info("Aggregate subnet IDs configured: %s", list(aggregate_subnet_ids)) # Load validator keys if path provided. diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 8c570aed..f470b534 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -12,7 +12,6 @@ from lean_spec.subspecs.chain.clock import Interval from lean_spec.subspecs.chain.config import ( - ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT, JUSTIFICATION_LOOKBACK_SLOTS, ) @@ -330,14 +329,17 @@ def on_gossip_attestation( This method: 1. Verifies the XMSS signature - 2. Stores the signature when the node is in aggregator mode and the - attester is on the same subnet as the current validator + 2. Stores the signature when the node is in aggregator mode + + Subnet filtering happens at the p2p subscription layer — only + attestations from subscribed subnets reach this method. No + additional subnet check is needed here. Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. is_aggregator: True if current validator holds aggregator role. - Only aggregator nodes collect gossip attestation signatures. + Only aggregator nodes store gossip attestation signatures. Returns: New Store with attestation processed and signature stored if aggregating. @@ -373,17 +375,11 @@ def on_gossip_attestation( # Copy the inner sets so we can add to them without mutating the previous store. new_committee_sigs = {k: set(v) for k, v in self.attestation_signatures.items()} - attester_subnet = validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - - # Aggregators collect signatures from validators sharing the same subnet. + # Aggregators store all received gossip signatures. + # The p2p layer only delivers attestations from subscribed subnets, + # so subnet filtering happens at subscription time, not here. # Non-aggregator nodes validate and drop — they never store gossip signatures. - should_store = False if is_aggregator: - assert self.validator_id is not None, "Current validator ID must be set for aggregation" - current_subnet = self.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) - should_store = current_subnet == attester_subnet - - if should_store: new_committee_sigs.setdefault(attestation_data, set()).add( AttestationSignatureEntry(validator_id, signature) ) diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index 2c0b9779..152adb34 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -2,8 +2,6 @@ from __future__ import annotations -from unittest import mock - import pytest from consensus_testing.keys import XmssKeyManager @@ -145,26 +143,20 @@ def test_on_block_preserves_immutability_of_aggregated_payloads( ) -class TestOnGossipAttestationSubnetFiltering: +class TestOnGossipAttestationImportGating: """ - Unit tests for on_gossip_attestation with is_aggregator=True. + Unit tests for on_gossip_attestation import gating. - Tests subnet ID computation and cross-subnet filtering logic. - When is_aggregator=True, signatures should only be stored if the - attesting validator is in the same subnet as the current validator. + Subnet filtering happens at the p2p subscription layer — only attestations + from subscribed subnets are delivered to the store. The store's sole gate + is is_aggregator: aggregators store everything they receive, non-aggregators + drop everything. """ - def test_same_subnet_stores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Aggregator stores signature when attester is in same subnet. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 0 is in subnet 0 (0 % 4 = 0) - - Validator 4 is in subnet 0 (4 % 4 = 0) - - Current validator (0) should store signature from validator 4. - """ + def test_aggregator_stores_received_attestation(self, key_manager: XmssKeyManager) -> None: + """Aggregator stores any attestation that reaches the store.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(4) + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -176,71 +168,49 @@ def test_same_subnet_stores_signature(self, key_manager: XmssKeyManager) -> None signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - # Verify signature does NOT exist before calling the method assert attestation_data not in store.attestation_signatures, ( - "Precondition: signature should not exist before calling method" + "Precondition: no signatures before processing" ) - # Patch ATTESTATION_COMMITTEE_COUNT to 4 so we can test subnet filtering - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - # Verify signature NOW exists after calling the method sigs = updated_store.attestation_signatures.get(attestation_data, set()) assert attester_validator in {entry.validator_id for entry in sigs}, ( - "Signature from same-subnet validator should be stored" + "Aggregator should store any attestation it receives" ) - def test_cross_subnet_ignores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Aggregator ignores signature when attester is in different subnet. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 0 is in subnet 0 (0 % 4 = 0) - - Validator 1 is in subnet 1 (1 % 4 = 1) - - Current validator (0) should NOT store signature from validator 1. - """ + def test_aggregator_stores_multiple_attestations(self, key_manager: XmssKeyManager) -> None: + """Aggregator stores all attestations regardless of which validator sent them.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(1) + attesters = [ValidatorIndex(1), ValidatorIndex(2), ValidatorIndex(3)] store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator ) - signed_attestation = SignedAttestation( - validator_id=attester_validator, - data=attestation_data, - signature=key_manager.sign_attestation_data(attester_validator, attestation_data), - ) - - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, + def make_signed(v: ValidatorIndex) -> SignedAttestation: + return SignedAttestation( + validator_id=v, + data=attestation_data, + signature=key_manager.sign_attestation_data(v, attestation_data), ) - # Verify signature was NOT stored - sigs = updated_store.attestation_signatures.get(attestation_data, set()) - assert attester_validator not in {entry.validator_id for entry in sigs}, ( - "Signature from different-subnet validator should NOT be stored" + updated = store + for v in attesters: + updated = updated.on_gossip_attestation(make_signed(v), is_aggregator=True) + + stored_ids = { + entry.validator_id + for entry in updated.attestation_signatures.get(attestation_data, set()) + } + assert stored_ids == set(attesters), ( + "Aggregator should store attestations from all received validators" ) def test_non_aggregator_never_stores_signature(self, key_manager: XmssKeyManager) -> None: - """ - Non-aggregator nodes never store gossip signatures. - - When is_aggregator=False, the signature storage path is skipped - regardless of subnet membership. - """ + """Non-aggregator nodes drop all gossip attestations regardless of sender.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(4) # Same subnet + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -252,29 +222,19 @@ def test_non_aggregator_never_stores_signature(self, key_manager: XmssKeyManager signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=False, # Not an aggregator - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=False) - # Verify signature was NOT stored even though same subnet sigs = updated_store.attestation_signatures.get(attestation_data, set()) assert attester_validator not in {entry.validator_id for entry in sigs}, ( "Non-aggregator should never store gossip signatures" ) - def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyManager) -> None: - """ - Cross-subnet attestation does not create a attestation_signatures entry. - - When the attester is in a different subnet, no entry is created - for that attestation data in attestation_signatures. - """ + def test_non_aggregator_does_not_create_signatures_entry( + self, key_manager: XmssKeyManager + ) -> None: + """Non-aggregator leaves attestation_signatures unchanged.""" current_validator = ValidatorIndex(0) - attester_validator = ValidatorIndex(1) # Different subnet + attester_validator = ValidatorIndex(1) store, attestation_data = make_store_with_attestation_data( key_manager, num_validators=8, validator_id=current_validator @@ -286,102 +246,10 @@ def test_cross_subnet_does_not_create_gossip_entry(self, key_manager: XmssKeyMan signature=key_manager.sign_attestation_data(attester_validator, attestation_data), ) - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=True, - ) + updated_store = store.on_gossip_attestation(signed_attestation, is_aggregator=False) - # Verify no gossip entry was created for this attestation data assert attestation_data not in updated_store.attestation_signatures, ( - "Cross-subnet attestation should not create a attestation_signatures entry" - ) - - def test_non_aggregator_does_not_store_any_subnet(self, key_manager: XmssKeyManager) -> None: - """ - Non-aggregator nodes never store signatures regardless of subnet. - - Import is gated solely on is_aggregator. A non-aggregator node - validates the attestation but drops the signature unconditionally. - - With ATTESTATION_COMMITTEE_COUNT=4: - - Validator 1 is in subnet 1. - - Non-aggregator with is_aggregator=False should not store it. - """ - attester_validator = ValidatorIndex(1) # Subnet 1 - - store, attestation_data = make_store_with_attestation_data( - key_manager, num_validators=8, validator_id=ValidatorIndex(0) - ) - - signed_attestation = SignedAttestation( - validator_id=attester_validator, - data=attestation_data, - signature=key_manager.sign_attestation_data(attester_validator, attestation_data), - ) - - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated_store = store.on_gossip_attestation( - signed_attestation, - is_aggregator=False, - ) - - sigs = updated_store.attestation_signatures.get(attestation_data, set()) - assert attester_validator not in {entry.validator_id for entry in sigs}, ( - "Non-aggregator should never store gossip signatures" - ) - - def test_aggregator_stores_own_subnet_only(self, key_manager: XmssKeyManager) -> None: - """ - Aggregator stores signatures from its own subnet, not from other subnets. - - The import gate is: is_aggregator AND attester is on same subnet. - No other path exists for storage. - - With ATTESTATION_COMMITTEE_COUNT=4 and current validator 0 (subnet 0): - - Validator 4 (subnet 0) is stored. - - Validator 1 (subnet 1) is not stored. - - Validator 2 (subnet 2) is not stored. - """ - current_validator = ValidatorIndex(0) # Subnet 0 - same_subnet_validator = ValidatorIndex(4) # Subnet 0 - other_subnet_validator_1 = ValidatorIndex(1) # Subnet 1 - other_subnet_validator_2 = ValidatorIndex(2) # Subnet 2 - - store, attestation_data = make_store_with_attestation_data( - key_manager, num_validators=8, validator_id=current_validator - ) - - def make_signed(v: ValidatorIndex) -> SignedAttestation: - return SignedAttestation( - validator_id=v, - data=attestation_data, - signature=key_manager.sign_attestation_data(v, attestation_data), - ) - - with mock.patch( - "lean_spec.subspecs.forkchoice.store.ATTESTATION_COMMITTEE_COUNT", Uint64(4) - ): - updated = store - for v in (same_subnet_validator, other_subnet_validator_1, other_subnet_validator_2): - updated = updated.on_gossip_attestation(make_signed(v), is_aggregator=True) - - stored_ids = { - entry.validator_id - for entry in updated.attestation_signatures.get(attestation_data, set()) - } - assert same_subnet_validator in stored_ids, ( - "Same-subnet validator should be stored by aggregator" - ) - assert other_subnet_validator_1 not in stored_ids, ( - "Different-subnet validator should not be stored" - ) - assert other_subnet_validator_2 not in stored_ids, ( - "Different-subnet validator should not be stored" + "Non-aggregator should not create any attestation_signatures entry" ) From b369812cda3691c6eb511591a8822ac0503ec13c Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 25 Mar 2026 18:25:45 +0000 Subject: [PATCH 7/7] node: check is_aggregator before parsing aggregate-subnet-ids in CLI --- src/lean_spec/__main__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 7bc121a7..1d0dd8fb 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -456,9 +456,6 @@ async def run_node( if is_aggregator: logger.info("Aggregator mode enabled - node will perform attestation aggregation") if aggregate_subnet_ids: - if not is_aggregator: - logger.error("--aggregate-subnet-ids requires --is-aggregator to be set") - sys.exit(1) logger.info("Aggregate subnet IDs configured: %s", list(aggregate_subnet_ids)) # Load validator keys if path provided. @@ -747,8 +744,12 @@ def main() -> None: setup_logging(args.verbose, args.no_color) # Parse --aggregate-subnet-ids from comma-separated string to tuple of SubnetId. + # Reject the flag upfront if --is-aggregator is not set. aggregate_subnet_ids: tuple[SubnetId, ...] = () if args.aggregate_subnet_ids: + if not args.is_aggregator: + logger.error("--aggregate-subnet-ids requires --is-aggregator to be set") + sys.exit(1) try: aggregate_subnet_ids = tuple( SubnetId(int(s.strip())) for s in args.aggregate_subnet_ids.split(",") if s.strip()