Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions crates/host-rpc/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::warn;
/// let notifier = RpcHostNotifierBuilder::new(provider)
/// .with_buffer_capacity(128)
/// .with_backfill_batch_size(64)
/// .with_max_rpc_concurrency(16)
/// .with_genesis_timestamp(1_606_824_023)
/// .build()
/// .await?;
Expand All @@ -19,6 +20,7 @@ pub struct RpcHostNotifierBuilder<P> {
provider: P,
buffer_capacity: usize,
backfill_batch_size: u64,
max_rpc_concurrency: usize,
slot_seconds: u64,
genesis_timestamp: u64,
}
Expand All @@ -33,20 +35,33 @@ where
provider,
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
max_rpc_concurrency: crate::DEFAULT_MAX_RPC_CONCURRENCY,
slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS,
genesis_timestamp: 0,
}
}

/// Set the block buffer capacity (default: 64).
///
/// Values below 1 are clamped to 1.
pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self {
self.buffer_capacity = capacity;
self.buffer_capacity = if capacity > 0 { capacity } else { 1 };
self
}

/// Set the backfill batch size (default: 32).
///
/// Values below 1 are clamped to 1.
pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self {
self.backfill_batch_size = batch_size;
self.backfill_batch_size = if batch_size > 0 { batch_size } else { 1 };
self
}

/// Set the maximum number of concurrent RPC block fetches (default: 8).
///
/// Values below 1 are clamped to 1.
pub const fn with_max_rpc_concurrency(mut self, max_rpc_concurrency: usize) -> Self {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add with_max_rpc_concurrency to the doc example above here.

Also, we clamp here, but not in with_backfill_batch_size or with_buffer_capacity which is a bit inconsistent.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in 3192297

self.max_rpc_concurrency = if max_rpc_concurrency > 0 { max_rpc_concurrency } else { 1 };
self
}

Expand Down Expand Up @@ -75,6 +90,7 @@ where
header_sub,
self.buffer_capacity,
self.backfill_batch_size,
self.max_rpc_concurrency,
self.slot_seconds,
self.genesis_timestamp,
))
Expand Down
16 changes: 15 additions & 1 deletion crates/host-rpc/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, RpcHostNotifierBuilder};
use crate::{
DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, DEFAULT_MAX_RPC_CONCURRENCY,
RpcHostNotifierBuilder,
};
use alloy::providers::RootProvider;
use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::PubSubConfig};

Expand All @@ -9,6 +12,7 @@ use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::P
/// - `SIGNET_HOST_URL` – WebSocket or IPC URL for the host EL client (required)
/// - `SIGNET_HOST_BUFFER_CAPACITY` – Local chain view size (default: 64)
/// - `SIGNET_HOST_BACKFILL_BATCH_SIZE` – Blocks per backfill batch (default: 32)
/// - `SIGNET_HOST_MAX_RPC_CONCURRENCY` – Max concurrent RPC block fetches (default: 8)
///
/// # Example
///
Expand Down Expand Up @@ -43,6 +47,13 @@ pub struct HostRpcConfig {
optional
)]
backfill_batch_size: Option<u64>,
/// Maximum number of concurrent RPC block fetches.
#[from_env(
var = "SIGNET_HOST_MAX_RPC_CONCURRENCY",
desc = "Max concurrent RPC requests [default: 8]",
optional
)]
max_rpc_concurrency: Option<usize>,
}

impl HostRpcConfig {
Expand All @@ -60,6 +71,9 @@ impl HostRpcConfig {
.with_backfill_batch_size(
self.backfill_batch_size.unwrap_or(DEFAULT_BACKFILL_BATCH_SIZE),
)
.with_max_rpc_concurrency(
self.max_rpc_concurrency.unwrap_or(DEFAULT_MAX_RPC_CONCURRENCY),
)
.with_genesis_timestamp(slot_calculator.start_timestamp()))
}
}
98 changes: 98 additions & 0 deletions crates/host-rpc/src/latest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use futures_util::Stream;
use std::{
pin::Pin,
task::{Context, Poll},
};

/// Stream adapter that coalesces buffered items, yielding only the
/// most recent ready item on each poll.
///
/// When the consumer is slow, items accumulate in the inner stream.
/// `Latest` drains all buffered ready items on each poll and returns
/// only the last one, discarding stale intermediates and recording a
/// metric for each skipped item.
pub(crate) struct Latest<S> {
inner: S,
}

impl<S> Latest<S> {
/// Wrap `inner` in a `Latest` combinator.
pub(crate) const fn new(inner: S) -> Self {
Self { inner }
}
}

impl<S: std::fmt::Debug> std::fmt::Debug for Latest<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Latest").field("inner", &self.inner).finish()
}
}

impl<S> Stream for Latest<S>
where
S: Stream + Unpin,
{
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::new(&mut self.inner);
let first = match inner.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(item) => item,
};

// Stream is exhausted or had no item.
let Some(mut latest) = first else {
return Poll::Ready(None);
};

// Drain all remaining ready items, keeping only the most recent.
let mut skipped: u64 = 0;
while let Poll::Ready(Some(newer)) = Pin::new(&mut self.inner).poll_next(cx) {
latest = newer;
skipped += 1;
}

if skipped > 0 {
crate::metrics::inc_headers_coalesced(skipped);
}

Poll::Ready(Some(latest))
}
}

#[cfg(test)]
mod tests {
use super::Latest;
use futures_util::{StreamExt, stream};

#[tokio::test]
async fn single_item_yields_immediately() {
let mut s = Latest::new(stream::iter([42u32]));
assert_eq!(s.next().await, Some(42));
assert_eq!(s.next().await, None);
}

#[tokio::test]
async fn multiple_ready_items_yields_last() {
let mut s = Latest::new(stream::iter([1u32, 2, 3, 4, 5]));
// All items are immediately ready; Latest should drain and return the last.
assert_eq!(s.next().await, Some(5));
assert_eq!(s.next().await, None);
}

#[tokio::test]
async fn empty_stream_yields_none() {
let mut s = Latest::new(stream::iter(Vec::<u32>::new()));
assert_eq!(s.next().await, None);
}

#[tokio::test]
async fn fused_after_inner_terminates() {
let mut s = Latest::new(stream::iter([7u32]));
assert_eq!(s.next().await, Some(7));
// Subsequent polls after termination should return None.
assert_eq!(s.next().await, None);
assert_eq!(s.next().await, None);
}
}
5 changes: 4 additions & 1 deletion crates/host-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 64;
/// Default backfill batch size.
pub(crate) const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
/// Default maximum number of concurrent RPC block fetches.
pub(crate) const DEFAULT_MAX_RPC_CONCURRENCY: usize = 8;

mod builder;
pub use builder::RpcHostNotifierBuilder;
Expand All @@ -28,9 +30,10 @@ pub use error::RpcHostError;
mod notifier;
pub use notifier::RpcHostNotifier;

mod latest;
mod metrics;
mod segment;
pub use segment::{RpcBlock, RpcChainSegment};
mod metrics;

mod alias;
pub use alias::RpcAliasOracle;
8 changes: 8 additions & 0 deletions crates/host-rpc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const BACKFILL_BATCHES: &str = "host_rpc.backfill_batches";
const TAG_REFRESHES: &str = "host_rpc.tag_refreshes";
const STALE_HINTS: &str = "host_rpc.stale_hints";
const RPC_ERRORS: &str = "host_rpc.rpc_errors";
const HEADERS_COALESCED: &str = "host_rpc.headers_coalesced";

const WALK_CHAIN_DURATION: &str = "host_rpc.walk_chain.duration_ms";
const FETCH_BLOCK_DURATION: &str = "host_rpc.fetch_block.duration_ms";
Expand All @@ -33,6 +34,7 @@ fn ensure_described() {
describe_counter!(TAG_REFRESHES, "Epoch boundary tag refreshes");
describe_counter!(STALE_HINTS, "Stale subscription hints that fell back to latest");
describe_counter!(RPC_ERRORS, "RPC transport/provider errors");
describe_counter!(HEADERS_COALESCED, "Stale subscription headers coalesced");
describe_histogram!(WALK_CHAIN_DURATION, "Time to walk the chain (ms)");
describe_histogram!(FETCH_BLOCK_DURATION, "Single block+receipts fetch (ms)");
describe_histogram!(BACKFILL_BATCH_DURATION, "Full backfill batch (ms)");
Expand Down Expand Up @@ -118,3 +120,9 @@ pub(crate) fn set_tip(number: u64) {
ensure_described();
gauge!(TIP_NUMBER).set(number as f64);
}

/// Increment the headers-coalesced counter.
pub(crate) fn inc_headers_coalesced(count: u64) {
ensure_described();
counter!(HEADERS_COALESCED).increment(count);
}
49 changes: 24 additions & 25 deletions crates/host-rpc/src/notifier.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{RpcBlock, RpcChainSegment, RpcHostError};
use crate::{RpcBlock, RpcChainSegment, RpcHostError, latest::Latest};
use alloy::{
consensus::{BlockHeader, transaction::Recovered},
eips::{BlockId, BlockNumberOrTag},
Expand All @@ -8,7 +8,7 @@ use alloy::{
pubsub::SubscriptionStream,
rpc::types::Header as RpcHeader,
};
use futures_util::{StreamExt, stream::FuturesOrdered};
use futures_util::{StreamExt, TryStreamExt, stream};
use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier, RevertRange};
use signet_types::primitives::{RecoveredBlock, SealedBlock, TransactionSigned};
use std::{collections::VecDeque, sync::Arc, time::Instant};
Expand Down Expand Up @@ -56,7 +56,8 @@ pub struct RpcHostNotifier<P> {
provider: P,

/// Subscription stream of new block headers (used as wake-up signal).
header_sub: SubscriptionStream<RpcHeader>,
/// Wrapped in [`Latest`] to coalesce stale buffered headers.
header_sub: Latest<SubscriptionStream<RpcHeader>>,

/// Local chain view — lightweight ring buffer of (number, hash).
chain_view: VecDeque<(u64, B256)>,
Expand All @@ -80,6 +81,9 @@ pub struct RpcHostNotifier<P> {
/// Max blocks per backfill batch.
backfill_batch_size: u64,

/// Maximum number of concurrent RPC block fetches.
max_rpc_concurrency: usize,

/// Seconds per slot, used for epoch calculation.
slot_seconds: u64,

Expand All @@ -92,6 +96,7 @@ impl<P> core::fmt::Debug for RpcHostNotifier<P> {
f.debug_struct("RpcHostNotifier")
.field("chain_view_len", &self.chain_view.len())
.field("buffer_capacity", &self.buffer_capacity)
.field("max_rpc_concurrency", &self.max_rpc_concurrency)
.field("backfill_from", &self.backfill_from)
.finish_non_exhaustive()
}
Expand All @@ -107,19 +112,21 @@ where
header_sub: SubscriptionStream<RpcHeader>,
buffer_capacity: usize,
backfill_batch_size: u64,
max_rpc_concurrency: usize,
slot_seconds: u64,
genesis_timestamp: u64,
) -> Self {
Self {
provider,
header_sub,
header_sub: Latest::new(header_sub),
chain_view: VecDeque::with_capacity(buffer_capacity),
buffer_capacity,
cached_safe: None,
cached_finalized: None,
last_tag_epoch: None,
backfill_from: None,
backfill_batch_size,
max_rpc_concurrency,
slot_seconds,
genesis_timestamp,
}
Expand Down Expand Up @@ -213,22 +220,17 @@ where
/// Fetch full blocks+receipts for a list of hashes, concurrently.
///
/// Hashes must be in ascending block-number order. Results preserve
/// that order.
/// that order. Concurrency is bounded by [`Self::max_rpc_concurrency`].
#[tracing::instrument(level = "debug", skip_all, fields(count = hashes.len()))]
async fn fetch_blocks_by_hash(
&self,
hashes: &[(u64, B256)],
) -> Result<Vec<Arc<RpcBlock>>, RpcHostError> {
let mut futures = hashes
.iter()
.map(|&(_, hash)| self.fetch_block_by_hash(hash))
.collect::<FuturesOrdered<_>>();

let mut blocks = Vec::with_capacity(hashes.len());
while let Some(result) = futures.next().await {
blocks.push(Arc::new(result?));
}
Ok(blocks)
stream::iter(hashes.iter().copied().map(|(_, hash)| self.fetch_block_by_hash(hash)))
.buffered(self.max_rpc_concurrency)
.map_ok(Arc::new)
.try_collect()
.await
}

/// Fetch a single block with receipts by number (used for backfill only).
Expand All @@ -250,22 +252,19 @@ where

/// Fetch a range of blocks by number concurrently (used for backfill only).
///
/// Returns an empty `Vec` if `from > to`.
/// Returns an empty `Vec` if `from > to`. Concurrency is bounded by
/// [`Self::max_rpc_concurrency`].
#[tracing::instrument(level = "debug", skip_all, fields(from, to))]
async fn fetch_range(&self, from: u64, to: u64) -> Result<Vec<Arc<RpcBlock>>, RpcHostError> {
if from > to {
return Ok(Vec::new());
}

let mut futures = (from..=to)
.map(|number| self.fetch_block_by_number(number))
.collect::<FuturesOrdered<_>>();

let mut blocks = Vec::with_capacity((to - from + 1) as usize);
while let Some(result) = futures.next().await {
blocks.push(Arc::new(result?));
}
Ok(blocks)
stream::iter((from..=to).map(|number| self.fetch_block_by_number(number)))
.buffered(self.max_rpc_concurrency)
.map_ok(Arc::new)
.try_collect()
.await
}

// ── Epoch / tag helpers ────────────────────────────────────────
Expand Down