diff --git a/crates/host-rpc/src/builder.rs b/crates/host-rpc/src/builder.rs index f7aa7e06..d4abebf8 100644 --- a/crates/host-rpc/src/builder.rs +++ b/crates/host-rpc/src/builder.rs @@ -10,6 +10,7 @@ use tracing::warn; /// let notifier = RpcHostNotifierBuilder::new(provider) /// .with_buffer_capacity(128) /// .with_backfill_batch_size(64) +/// .with_max_rpc_concurrency(16) /// .with_genesis_timestamp(1_606_824_023) /// .build() /// .await?; @@ -19,6 +20,7 @@ pub struct RpcHostNotifierBuilder

{ provider: P, buffer_capacity: usize, backfill_batch_size: u64, + max_rpc_concurrency: usize, slot_seconds: u64, genesis_timestamp: u64, } @@ -33,20 +35,33 @@ where provider, buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY, backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE, + max_rpc_concurrency: crate::DEFAULT_MAX_RPC_CONCURRENCY, slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS, genesis_timestamp: 0, } } /// Set the block buffer capacity (default: 64). + /// + /// Values below 1 are clamped to 1. pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self { - self.buffer_capacity = capacity; + self.buffer_capacity = if capacity > 0 { capacity } else { 1 }; self } /// Set the backfill batch size (default: 32). + /// + /// Values below 1 are clamped to 1. pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self { - self.backfill_batch_size = batch_size; + self.backfill_batch_size = if batch_size > 0 { batch_size } else { 1 }; + self + } + + /// Set the maximum number of concurrent RPC block fetches (default: 8). + /// + /// Values below 1 are clamped to 1. + pub const fn with_max_rpc_concurrency(mut self, max_rpc_concurrency: usize) -> Self { + self.max_rpc_concurrency = if max_rpc_concurrency > 0 { max_rpc_concurrency } else { 1 }; self } @@ -75,6 +90,7 @@ where header_sub, self.buffer_capacity, self.backfill_batch_size, + self.max_rpc_concurrency, self.slot_seconds, self.genesis_timestamp, )) diff --git a/crates/host-rpc/src/config.rs b/crates/host-rpc/src/config.rs index 682304e3..436ce22b 100644 --- a/crates/host-rpc/src/config.rs +++ b/crates/host-rpc/src/config.rs @@ -1,4 +1,7 @@ -use crate::{DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, RpcHostNotifierBuilder}; +use crate::{ + DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, DEFAULT_MAX_RPC_CONCURRENCY, + RpcHostNotifierBuilder, +}; use alloy::providers::RootProvider; use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::PubSubConfig}; @@ -9,6 +12,7 @@ use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::P /// - `SIGNET_HOST_URL` – WebSocket or IPC URL for the host EL client (required) /// - `SIGNET_HOST_BUFFER_CAPACITY` – Local chain view size (default: 64) /// - `SIGNET_HOST_BACKFILL_BATCH_SIZE` – Blocks per backfill batch (default: 32) +/// - `SIGNET_HOST_MAX_RPC_CONCURRENCY` – Max concurrent RPC block fetches (default: 8) /// /// # Example /// @@ -43,6 +47,13 @@ pub struct HostRpcConfig { optional )] backfill_batch_size: Option, + /// Maximum number of concurrent RPC block fetches. + #[from_env( + var = "SIGNET_HOST_MAX_RPC_CONCURRENCY", + desc = "Max concurrent RPC requests [default: 8]", + optional + )] + max_rpc_concurrency: Option, } impl HostRpcConfig { @@ -60,6 +71,9 @@ impl HostRpcConfig { .with_backfill_batch_size( self.backfill_batch_size.unwrap_or(DEFAULT_BACKFILL_BATCH_SIZE), ) + .with_max_rpc_concurrency( + self.max_rpc_concurrency.unwrap_or(DEFAULT_MAX_RPC_CONCURRENCY), + ) .with_genesis_timestamp(slot_calculator.start_timestamp())) } } diff --git a/crates/host-rpc/src/latest.rs b/crates/host-rpc/src/latest.rs new file mode 100644 index 00000000..3afcebf6 --- /dev/null +++ b/crates/host-rpc/src/latest.rs @@ -0,0 +1,98 @@ +use futures_util::Stream; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// Stream adapter that coalesces buffered items, yielding only the +/// most recent ready item on each poll. +/// +/// When the consumer is slow, items accumulate in the inner stream. +/// `Latest` drains all buffered ready items on each poll and returns +/// only the last one, discarding stale intermediates and recording a +/// metric for each skipped item. +pub(crate) struct Latest { + inner: S, +} + +impl Latest { + /// Wrap `inner` in a `Latest` combinator. + pub(crate) const fn new(inner: S) -> Self { + Self { inner } + } +} + +impl std::fmt::Debug for Latest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Latest").field("inner", &self.inner).finish() + } +} + +impl Stream for Latest +where + S: Stream + Unpin, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = Pin::new(&mut self.inner); + let first = match inner.poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(item) => item, + }; + + // Stream is exhausted or had no item. + let Some(mut latest) = first else { + return Poll::Ready(None); + }; + + // Drain all remaining ready items, keeping only the most recent. + let mut skipped: u64 = 0; + while let Poll::Ready(Some(newer)) = Pin::new(&mut self.inner).poll_next(cx) { + latest = newer; + skipped += 1; + } + + if skipped > 0 { + crate::metrics::inc_headers_coalesced(skipped); + } + + Poll::Ready(Some(latest)) + } +} + +#[cfg(test)] +mod tests { + use super::Latest; + use futures_util::{StreamExt, stream}; + + #[tokio::test] + async fn single_item_yields_immediately() { + let mut s = Latest::new(stream::iter([42u32])); + assert_eq!(s.next().await, Some(42)); + assert_eq!(s.next().await, None); + } + + #[tokio::test] + async fn multiple_ready_items_yields_last() { + let mut s = Latest::new(stream::iter([1u32, 2, 3, 4, 5])); + // All items are immediately ready; Latest should drain and return the last. + assert_eq!(s.next().await, Some(5)); + assert_eq!(s.next().await, None); + } + + #[tokio::test] + async fn empty_stream_yields_none() { + let mut s = Latest::new(stream::iter(Vec::::new())); + assert_eq!(s.next().await, None); + } + + #[tokio::test] + async fn fused_after_inner_terminates() { + let mut s = Latest::new(stream::iter([7u32])); + assert_eq!(s.next().await, Some(7)); + // Subsequent polls after termination should return None. + assert_eq!(s.next().await, None); + assert_eq!(s.next().await, None); + } +} diff --git a/crates/host-rpc/src/lib.rs b/crates/host-rpc/src/lib.rs index 17a8c8f6..6cc105d8 100644 --- a/crates/host-rpc/src/lib.rs +++ b/crates/host-rpc/src/lib.rs @@ -15,6 +15,8 @@ pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 64; /// Default backfill batch size. pub(crate) const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32; +/// Default maximum number of concurrent RPC block fetches. +pub(crate) const DEFAULT_MAX_RPC_CONCURRENCY: usize = 8; mod builder; pub use builder::RpcHostNotifierBuilder; @@ -28,9 +30,10 @@ pub use error::RpcHostError; mod notifier; pub use notifier::RpcHostNotifier; +mod latest; +mod metrics; mod segment; pub use segment::{RpcBlock, RpcChainSegment}; -mod metrics; mod alias; pub use alias::RpcAliasOracle; diff --git a/crates/host-rpc/src/metrics.rs b/crates/host-rpc/src/metrics.rs index 05baf5f8..91298c77 100644 --- a/crates/host-rpc/src/metrics.rs +++ b/crates/host-rpc/src/metrics.rs @@ -10,6 +10,7 @@ const BACKFILL_BATCHES: &str = "host_rpc.backfill_batches"; const TAG_REFRESHES: &str = "host_rpc.tag_refreshes"; const STALE_HINTS: &str = "host_rpc.stale_hints"; const RPC_ERRORS: &str = "host_rpc.rpc_errors"; +const HEADERS_COALESCED: &str = "host_rpc.headers_coalesced"; const WALK_CHAIN_DURATION: &str = "host_rpc.walk_chain.duration_ms"; const FETCH_BLOCK_DURATION: &str = "host_rpc.fetch_block.duration_ms"; @@ -33,6 +34,7 @@ fn ensure_described() { describe_counter!(TAG_REFRESHES, "Epoch boundary tag refreshes"); describe_counter!(STALE_HINTS, "Stale subscription hints that fell back to latest"); describe_counter!(RPC_ERRORS, "RPC transport/provider errors"); + describe_counter!(HEADERS_COALESCED, "Stale subscription headers coalesced"); describe_histogram!(WALK_CHAIN_DURATION, "Time to walk the chain (ms)"); describe_histogram!(FETCH_BLOCK_DURATION, "Single block+receipts fetch (ms)"); describe_histogram!(BACKFILL_BATCH_DURATION, "Full backfill batch (ms)"); @@ -118,3 +120,9 @@ pub(crate) fn set_tip(number: u64) { ensure_described(); gauge!(TIP_NUMBER).set(number as f64); } + +/// Increment the headers-coalesced counter. +pub(crate) fn inc_headers_coalesced(count: u64) { + ensure_described(); + counter!(HEADERS_COALESCED).increment(count); +} diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs index 6d33e8f2..a8bc2ec4 100644 --- a/crates/host-rpc/src/notifier.rs +++ b/crates/host-rpc/src/notifier.rs @@ -1,4 +1,4 @@ -use crate::{RpcBlock, RpcChainSegment, RpcHostError}; +use crate::{RpcBlock, RpcChainSegment, RpcHostError, latest::Latest}; use alloy::{ consensus::{BlockHeader, transaction::Recovered}, eips::{BlockId, BlockNumberOrTag}, @@ -8,7 +8,7 @@ use alloy::{ pubsub::SubscriptionStream, rpc::types::Header as RpcHeader, }; -use futures_util::{StreamExt, stream::FuturesOrdered}; +use futures_util::{StreamExt, TryStreamExt, stream}; use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier, RevertRange}; use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned}; use std::{collections::VecDeque, sync::Arc, time::Instant}; @@ -56,7 +56,8 @@ pub struct RpcHostNotifier

{ provider: P, /// Subscription stream of new block headers (used as wake-up signal). - header_sub: SubscriptionStream, + /// Wrapped in [`Latest`] to coalesce stale buffered headers. + header_sub: Latest>, /// Local chain view — lightweight ring buffer of (number, hash). chain_view: VecDeque<(u64, B256)>, @@ -80,6 +81,9 @@ pub struct RpcHostNotifier

{ /// Max blocks per backfill batch. backfill_batch_size: u64, + /// Maximum number of concurrent RPC block fetches. + max_rpc_concurrency: usize, + /// Seconds per slot, used for epoch calculation. slot_seconds: u64, @@ -92,6 +96,7 @@ impl

core::fmt::Debug for RpcHostNotifier

{ f.debug_struct("RpcHostNotifier") .field("chain_view_len", &self.chain_view.len()) .field("buffer_capacity", &self.buffer_capacity) + .field("max_rpc_concurrency", &self.max_rpc_concurrency) .field("backfill_from", &self.backfill_from) .finish_non_exhaustive() } @@ -107,12 +112,13 @@ where header_sub: SubscriptionStream, buffer_capacity: usize, backfill_batch_size: u64, + max_rpc_concurrency: usize, slot_seconds: u64, genesis_timestamp: u64, ) -> Self { Self { provider, - header_sub, + header_sub: Latest::new(header_sub), chain_view: VecDeque::with_capacity(buffer_capacity), buffer_capacity, cached_safe: None, @@ -120,6 +126,7 @@ where last_tag_epoch: None, backfill_from: None, backfill_batch_size, + max_rpc_concurrency, slot_seconds, genesis_timestamp, } @@ -213,22 +220,17 @@ where /// Fetch full blocks+receipts for a list of hashes, concurrently. /// /// Hashes must be in ascending block-number order. Results preserve - /// that order. + /// that order. Concurrency is bounded by [`Self::max_rpc_concurrency`]. #[tracing::instrument(level = "debug", skip_all, fields(count = hashes.len()))] async fn fetch_blocks_by_hash( &self, hashes: &[(u64, B256)], ) -> Result>, RpcHostError> { - let mut futures = hashes - .iter() - .map(|&(_, hash)| self.fetch_block_by_hash(hash)) - .collect::>(); - - let mut blocks = Vec::with_capacity(hashes.len()); - while let Some(result) = futures.next().await { - blocks.push(Arc::new(result?)); - } - Ok(blocks) + stream::iter(hashes.iter().copied().map(|(_, hash)| self.fetch_block_by_hash(hash))) + .buffered(self.max_rpc_concurrency) + .map_ok(Arc::new) + .try_collect() + .await } /// Fetch a single block with receipts by number (used for backfill only). @@ -250,22 +252,19 @@ where /// Fetch a range of blocks by number concurrently (used for backfill only). /// - /// Returns an empty `Vec` if `from > to`. + /// Returns an empty `Vec` if `from > to`. Concurrency is bounded by + /// [`Self::max_rpc_concurrency`]. #[tracing::instrument(level = "debug", skip_all, fields(from, to))] async fn fetch_range(&self, from: u64, to: u64) -> Result>, RpcHostError> { if from > to { return Ok(Vec::new()); } - let mut futures = (from..=to) - .map(|number| self.fetch_block_by_number(number)) - .collect::>(); - - let mut blocks = Vec::with_capacity((to - from + 1) as usize); - while let Some(result) = futures.next().await { - blocks.push(Arc::new(result?)); - } - Ok(blocks) + stream::iter((from..=to).map(|number| self.fetch_block_by_number(number))) + .buffered(self.max_rpc_concurrency) + .map_ok(Arc::new) + .try_collect() + .await } // ── Epoch / tag helpers ────────────────────────────────────────