diff --git a/crates/host-rpc/src/builder.rs b/crates/host-rpc/src/builder.rs
index f7aa7e06..d4abebf8 100644
--- a/crates/host-rpc/src/builder.rs
+++ b/crates/host-rpc/src/builder.rs
@@ -10,6 +10,7 @@ use tracing::warn;
/// let notifier = RpcHostNotifierBuilder::new(provider)
/// .with_buffer_capacity(128)
/// .with_backfill_batch_size(64)
+/// .with_max_rpc_concurrency(16)
/// .with_genesis_timestamp(1_606_824_023)
/// .build()
/// .await?;
@@ -19,6 +20,7 @@ pub struct RpcHostNotifierBuilder
{
provider: P,
buffer_capacity: usize,
backfill_batch_size: u64,
+ max_rpc_concurrency: usize,
slot_seconds: u64,
genesis_timestamp: u64,
}
@@ -33,20 +35,33 @@ where
provider,
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
+ max_rpc_concurrency: crate::DEFAULT_MAX_RPC_CONCURRENCY,
slot_seconds: crate::notifier::DEFAULT_SLOT_SECONDS,
genesis_timestamp: 0,
}
}
/// Set the block buffer capacity (default: 64).
+ ///
+ /// Values below 1 are clamped to 1.
pub const fn with_buffer_capacity(mut self, capacity: usize) -> Self {
- self.buffer_capacity = capacity;
+ self.buffer_capacity = if capacity > 0 { capacity } else { 1 };
self
}
/// Set the backfill batch size (default: 32).
+ ///
+ /// Values below 1 are clamped to 1.
pub const fn with_backfill_batch_size(mut self, batch_size: u64) -> Self {
- self.backfill_batch_size = batch_size;
+ self.backfill_batch_size = if batch_size > 0 { batch_size } else { 1 };
+ self
+ }
+
+ /// Set the maximum number of concurrent RPC block fetches (default: 8).
+ ///
+ /// Values below 1 are clamped to 1.
+ pub const fn with_max_rpc_concurrency(mut self, max_rpc_concurrency: usize) -> Self {
+ self.max_rpc_concurrency = if max_rpc_concurrency > 0 { max_rpc_concurrency } else { 1 };
self
}
@@ -75,6 +90,7 @@ where
header_sub,
self.buffer_capacity,
self.backfill_batch_size,
+ self.max_rpc_concurrency,
self.slot_seconds,
self.genesis_timestamp,
))
diff --git a/crates/host-rpc/src/config.rs b/crates/host-rpc/src/config.rs
index 682304e3..436ce22b 100644
--- a/crates/host-rpc/src/config.rs
+++ b/crates/host-rpc/src/config.rs
@@ -1,4 +1,7 @@
-use crate::{DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, RpcHostNotifierBuilder};
+use crate::{
+ DEFAULT_BACKFILL_BATCH_SIZE, DEFAULT_BUFFER_CAPACITY, DEFAULT_MAX_RPC_CONCURRENCY,
+ RpcHostNotifierBuilder,
+};
use alloy::providers::RootProvider;
use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::PubSubConfig};
@@ -9,6 +12,7 @@ use init4_bin_base::utils::{calc::SlotCalculator, from_env::FromEnv, provider::P
/// - `SIGNET_HOST_URL` – WebSocket or IPC URL for the host EL client (required)
/// - `SIGNET_HOST_BUFFER_CAPACITY` – Local chain view size (default: 64)
/// - `SIGNET_HOST_BACKFILL_BATCH_SIZE` – Blocks per backfill batch (default: 32)
+/// - `SIGNET_HOST_MAX_RPC_CONCURRENCY` – Max concurrent RPC block fetches (default: 8)
///
/// # Example
///
@@ -43,6 +47,13 @@ pub struct HostRpcConfig {
optional
)]
backfill_batch_size: Option,
+ /// Maximum number of concurrent RPC block fetches.
+ #[from_env(
+ var = "SIGNET_HOST_MAX_RPC_CONCURRENCY",
+ desc = "Max concurrent RPC requests [default: 8]",
+ optional
+ )]
+ max_rpc_concurrency: Option,
}
impl HostRpcConfig {
@@ -60,6 +71,9 @@ impl HostRpcConfig {
.with_backfill_batch_size(
self.backfill_batch_size.unwrap_or(DEFAULT_BACKFILL_BATCH_SIZE),
)
+ .with_max_rpc_concurrency(
+ self.max_rpc_concurrency.unwrap_or(DEFAULT_MAX_RPC_CONCURRENCY),
+ )
.with_genesis_timestamp(slot_calculator.start_timestamp()))
}
}
diff --git a/crates/host-rpc/src/latest.rs b/crates/host-rpc/src/latest.rs
new file mode 100644
index 00000000..3afcebf6
--- /dev/null
+++ b/crates/host-rpc/src/latest.rs
@@ -0,0 +1,98 @@
+use futures_util::Stream;
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+/// Stream adapter that coalesces buffered items, yielding only the
+/// most recent ready item on each poll.
+///
+/// When the consumer is slow, items accumulate in the inner stream.
+/// `Latest` drains all buffered ready items on each poll and returns
+/// only the last one, discarding stale intermediates and recording a
+/// metric for each skipped item.
+pub(crate) struct Latest {
+ inner: S,
+}
+
+impl Latest {
+ /// Wrap `inner` in a `Latest` combinator.
+ pub(crate) const fn new(inner: S) -> Self {
+ Self { inner }
+ }
+}
+
+impl std::fmt::Debug for Latest {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Latest").field("inner", &self.inner).finish()
+ }
+}
+
+impl Stream for Latest
+where
+ S: Stream + Unpin,
+{
+ type Item = S::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll