{
+ self.calls.lock().expect("lock").clone()
+ }
+ }
+
+ impl WalletNonceWatermarkSink for RecordingWatermarkSink {
+ fn raise_to(&self, highest: u64) -> Result<(), String> {
+ self.calls.lock().expect("lock").push(highest);
+ if self.fail {
+ Err("recording sink: forced failure".to_string())
+ } else {
+ Ok(())
+ }
+ }
+ }
+
+ /// R1a write-before-broadcast: `submit_batches` must raise the watermark to
+ /// cover the whole consecutive nonce range *before* the first send. We lock
+ /// it with a sink that fails on `raise_to`: a correct poster aborts the tick
+ /// before broadcasting anything, so the submitter's pending nonce is
+ /// unchanged. If `raise_to` were moved after the first `addInput` send
+ /// (re-opening the F1 zombie-tx hole), that send would bump the pending
+ /// nonce and this test would go red. Also pins the raise count (once) and
+ /// value (`base + payloads.len() - 1`). (Mutation-checked: moving the raise
+ /// after the send loop fails this test.)
+ #[tokio::test]
+ async fn submit_batches_raises_watermark_before_any_send() {
+ require_anvil();
+ let anvil = Anvil::default().spawn();
+ // Anvil account 0 — the submitter; its key signs the (never-sent) txs.
+ let key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
+ let submitter = alloy_primitives::address!("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
+ let provider = crate::l1::provider::create_signer_provider(&anvil.endpoint(), key)
+ .expect("signer provider");
+
+ let config = BatchPosterConfig {
+ l1_submit_address: alloy_primitives::Address::repeat_byte(0x11),
+ app_address: alloy_primitives::Address::repeat_byte(0x22),
+ batch_submitter_address: submitter,
+ start_block: 0,
+ confirmation_depth: 0,
+ seconds_per_block: 1,
+ long_block_range_error_codes: vec![],
+ expected_chain_id: anvil.chain_id(),
+ };
+ let poster = EthereumBatchPoster::new(provider.clone(), config);
+
+ let base_nonce = provider
+ .get_transaction_count(submitter)
+ .await
+ .expect("base nonce");
+ let sink = RecordingWatermarkSink::failing();
+ let payloads = vec![vec![0u8; 4], vec![1u8; 4], vec![2u8; 4]]; // 3 consecutive nonces
+
+ let result = poster.submit_batches(payloads, &sink).await;
+
+ assert!(
+ matches!(result, Err(BatchPosterError::Provider(_))),
+ "a failing watermark sink must abort submit_batches, got {result:?}"
+ );
+ // (a) raised exactly once, (b) to the highest nonce of the range.
+ assert_eq!(
+ sink.calls(),
+ vec![base_nonce + 2],
+ "raise_to must be called once with base_nonce + payloads.len() - 1"
+ );
+ // (c) before any send — no tx broadcast, so pending nonce is unchanged.
+ let pending = provider
+ .get_transaction_count(submitter)
+ .block_id(BlockNumberOrTag::Pending.into())
+ .await
+ .expect("pending nonce");
+ assert_eq!(
+ pending, base_nonce,
+ "raise_to must run before any send; a broadcast would have bumped the pending nonce"
+ );
+ }
+
+ /// Keyed-write chain-id gate: a long-lived submitter pointed at an RPC that
+ /// serves a different chain than the pinned one must refuse to submit, before
+ /// any productive work — no watermark raise, no broadcast. Anvil's chain id
+ /// is 31337; we pin a different one and assert the `ChainIdMismatch` refusal
+ /// fires ahead of the (would-otherwise-fail-later) watermark raise.
+ #[tokio::test]
+ async fn submit_batches_refuses_on_wrong_chain_before_any_work() {
+ require_anvil();
+ let anvil = Anvil::default().spawn();
+ let key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
+ let submitter = alloy_primitives::address!("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
+ let provider = crate::l1::provider::create_signer_provider(&anvil.endpoint(), key)
+ .expect("signer provider");
+
+ let wrong_chain_id = anvil.chain_id() + 1;
+ let config = BatchPosterConfig {
+ l1_submit_address: alloy_primitives::Address::repeat_byte(0x11),
+ app_address: alloy_primitives::Address::repeat_byte(0x22),
+ batch_submitter_address: submitter,
+ start_block: 0,
+ confirmation_depth: 0,
+ seconds_per_block: 1,
+ long_block_range_error_codes: vec![],
+ expected_chain_id: wrong_chain_id,
+ };
+ let poster = EthereumBatchPoster::new(provider.clone(), config);
+
+ let base_nonce = provider
+ .get_transaction_count(submitter)
+ .await
+ .expect("base nonce");
+ // A sink that would *succeed* — so the only thing that can stop a send is
+ // the chain-id gate, not the watermark guard. (Recording proves the gate
+ // fires first: a passing chain check would reach `raise_to`.)
+ let sink = RecordingWatermarkSink::passing();
+ let payloads = vec![vec![0u8; 4], vec![1u8; 4]];
+
+ let result = poster.submit_batches(payloads, &sink).await;
+
+ assert!(
+ matches!(
+ result,
+ Err(BatchPosterError::ChainIdMismatch { rpc, expected })
+ if rpc == anvil.chain_id() && expected == wrong_chain_id
+ ),
+ "wrong-chain RPC must abort submit_batches with ChainIdMismatch, got {result:?}"
+ );
+ assert!(
+ sink.calls().is_empty(),
+ "chain-id gate must fire before the watermark raise (no raise_to call)"
+ );
+ let pending = provider
+ .get_transaction_count(submitter)
+ .block_id(BlockNumberOrTag::Pending.into())
+ .await
+ .expect("pending nonce");
+ assert_eq!(
+ pending, base_nonce,
+ "no tx may be broadcast on the wrong chain"
+ );
+ }
#[tokio::test]
async fn mock_poster_tracks_requested_suffix_start_block() {
diff --git a/sequencer/src/l1/submitter/worker.rs b/sequencer/src/l1/submitter/worker.rs
index d47d885..5ea1763 100644
--- a/sequencer/src/l1/submitter/worker.rs
+++ b/sequencer/src/l1/submitter/worker.rs
@@ -78,7 +78,7 @@ fn decide_submit_start(frontier: SubmitterFrontier, recently_observed_nonces: &[
// unresolved nonce. The scan starts at `safe_block + 1` (the submitter
// asks the poster for that), so wallet-nonce ordering guarantees the
// observed list mirrors our submission order.
- advance_expected_batch_nonce(
+ sequencer_core::protocol::advance_expected_batch_nonce(
frontier.accepted_next_nonce,
recently_observed_nonces.iter().copied(),
)
@@ -88,12 +88,17 @@ pub struct BatchSubmitter {
db_path: String,
poster: Arc,
idle_poll_interval: Duration,
+ /// Write-before-broadcast hook (review R1a): the poster raises the
+ /// persisted wallet-nonce watermark through this before every send.
+ watermark_sink: crate::l1::watermark::StorageWatermarkSink,
}
impl BatchSubmitter {
pub fn new(db_path: impl Into, poster: Arc, config: BatchSubmitterConfig) -> Self {
+ let db_path = db_path.into();
Self {
- db_path: db_path.into(),
+ watermark_sink: crate::l1::watermark::StorageWatermarkSink::new(db_path.clone()),
+ db_path,
poster,
idle_poll_interval: config.idle_poll_interval(),
}
@@ -137,6 +142,12 @@ impl BatchSubmitter {
loop {
let outcome = match self.tick_once().await {
Ok(o) => o,
+ // A wrong-chain RPC is terminal — never retry-loop signing onto
+ // it. Lift it out of the transient `Poster` bucket below.
+ Err(e @ BatchSubmitterError::Poster(BatchPosterError::ChainIdMismatch { .. })) => {
+ error!(error = %e, "RPC serves the wrong chain — refusing to submit");
+ return Err(e);
+ }
Err(BatchSubmitterError::Poster(source)) => {
error!(error = %source, "L1 provider error — will retry");
TickOutcome::Transient
@@ -180,7 +191,10 @@ impl BatchSubmitter {
}
let submitted_count = pending.len();
let payloads: Vec> = pending.into_iter().map(|b| b.encoded).collect();
- let tx_hashes = self.poster.submit_batches(payloads).await?;
+ let tx_hashes = self
+ .poster
+ .submit_batches(payloads, &self.watermark_sink)
+ .await?;
if tx_hashes.len() != submitted_count {
return Err(BatchSubmitterError::Poster(BatchPosterError::Provider(
format!(
@@ -221,38 +235,6 @@ impl BatchSubmitter {
}
}
-/// Advance `expected` by greedily consuming any matching observed nonce.
-///
-/// `observed_nonces` is the stream of **batch nonces** (from the SSZ payload)
-/// decoded from `InputAdded` events sent by our batch-submitter EOA, in L1
-/// event order. Because L1 mines txs from a single EOA in strict wallet-nonce
-/// order, this stream is naturally gap-less at the wallet-nonce level:
-/// tx[k]'s event cannot appear on-chain without tx[k-1]'s event, and the
-/// observed batch nonce sequence therefore mirrors our submission order.
-///
-/// Batch nonces themselves (unlike wallet nonces) CAN repeat across recovery
-/// generations — e.g., after a cascade, a fresh batch reuses its invalidated
-/// predecessor's nonce. That's why we still match on equality rather than
-/// trusting a sort: in a post-recovery window, the same batch nonce can be
-/// observed twice (once from the invalidated generation, once from the new
-/// one), and we only want to advance once.
-///
-/// Under the wallet-nonce ordering above, once the next `expected` doesn't
-/// appear in the stream the frontier naturally stops advancing — the gap
-/// means the scheduler hasn't seen that nonce on-chain yet (or observed it at
-/// a different wallet nonce from an earlier generation).
-fn advance_expected_batch_nonce(
- mut expected: u64,
- observed_nonces: impl IntoIterator- ,
-) -> u64 {
- for nonce in observed_nonces {
- if nonce == expected {
- expected = expected.saturating_add(1);
- }
- }
- expected
-}
-
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -306,14 +288,13 @@ mod tests {
fn seed_safe_submitted_batches(db_path: &str, safe_block: u64, nonces: &[u64]) {
let mut storage = Storage::open(db_path).expect("open storage");
+ // Landings carry the local batch's real wire bytes so the
+ // content-identity check (review R2) accepts them.
let inputs: Vec<_> = nonces
.iter()
.map(|nonce| StoredSafeInput {
sender: BATCH_SUBMITTER_ADDRESS,
- payload: ssz::Encode::as_ssz_bytes(&sequencer_core::batch::Batch {
- nonce: *nonce,
- frames: Vec::new(),
- }),
+ payload: crate::storage::test_helpers::local_batch_payload(&mut storage, *nonce),
block_number: safe_block,
})
.collect();
@@ -495,19 +476,4 @@ mod tests {
// expected=3, skip. 3 matches → advance to 4.
assert_eq!(from_nonce, 4);
}
-
- #[test]
- fn advance_expected_batch_nonce_matches_scheduler_nonce_rule() {
- assert_eq!(super::advance_expected_batch_nonce(0, Vec::::new()), 0);
- assert_eq!(super::advance_expected_batch_nonce(0, vec![0, 1, 2]), 3);
- assert_eq!(super::advance_expected_batch_nonce(0, vec![0, 2, 3]), 1);
- assert_eq!(super::advance_expected_batch_nonce(0, vec![1, 2, 3]), 0);
- assert_eq!(super::advance_expected_batch_nonce(0, vec![0, 1, 1, 2]), 3);
- assert_eq!(
- super::advance_expected_batch_nonce(0, vec![6, 4, 3, 2, 2, 0, 1]),
- 2
- );
- assert_eq!(super::advance_expected_batch_nonce(0, vec![0, 2, 1]), 2);
- assert_eq!(super::advance_expected_batch_nonce(2, vec![2, 3]), 4);
- }
}
diff --git a/sequencer/src/l1/watermark.rs b/sequencer/src/l1/watermark.rs
new file mode 100644
index 0000000..18fef18
--- /dev/null
+++ b/sequencer/src/l1/watermark.rs
@@ -0,0 +1,50 @@
+// (c) Cartesi and individual authors (see AUTHORS)
+// SPDX-License-Identifier: Apache-2.0 (see LICENSE)
+
+//! Write-before-broadcast hook for the wallet-nonce watermark (review R1a).
+//!
+//! Every component that broadcasts a transaction from the batch-submitter key
+//! — the batch poster and the mempool flusher's no-ops alike — must first
+//! durably commit `watermark = max(watermark, highest_nonce_about_to_send)`
+//! and only then send. One uniform rule, no case analysis: the invariant is
+//! simply "the watermark covers the nonce of everything we ever sent", which
+//! is what lets the flush consume every slot we ever used without trusting
+//! the local node's volatile mempool memory (the F1 zombie counterexample).
+//!
+//! A crash between the commit and the send only over-covers: the flush later
+//! no-ops a never-used slot — one wasted no-op, harmless.
+
+use crate::storage::Storage;
+
+/// Durable raise of the wallet-nonce watermark, called *before* broadcasting.
+/// `raise_to(h)` must commit `watermark = max(watermark, h)` power-loss
+/// durably before returning; the caller may then broadcast txs at nonces
+/// `<= h`.
+pub trait WalletNonceWatermarkSink: Send + Sync {
+ fn raise_to(&self, highest: u64) -> Result<(), String>;
+}
+
+/// Sink backed by the sequencer DB's `wallet_nonce_watermark` singleton.
+/// Opens a short-lived writer connection per raise — raises happen at most
+/// once per submitter tick / flush pass, well off the hot path.
+pub struct StorageWatermarkSink {
+ db_path: String,
+}
+
+impl StorageWatermarkSink {
+ pub fn new(db_path: impl Into) -> Self {
+ Self {
+ db_path: db_path.into(),
+ }
+ }
+}
+
+impl WalletNonceWatermarkSink for StorageWatermarkSink {
+ fn raise_to(&self, highest: u64) -> Result<(), String> {
+ let mut storage = Storage::open_writer(&self.db_path)
+ .map_err(|e| format!("watermark sink: open storage: {e}"))?;
+ storage
+ .raise_wallet_nonce_watermark(highest)
+ .map_err(|e| format!("watermark sink: raise: {e}"))
+ }
+}
diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs
index a40c98d..120a99a 100644
--- a/sequencer/src/lib.rs
+++ b/sequencer/src/lib.rs
@@ -17,6 +17,7 @@
//! invariant the storage layer relies on.
pub mod egress;
+pub mod harness;
pub mod http;
pub mod ingress;
pub mod l1;
@@ -24,6 +25,7 @@ pub mod recovery;
pub mod runtime;
pub mod storage;
+pub use harness::{Cli, Command, dispatch, run_main};
pub use http::{ApiConfig, ApiError, WS_CATCHUP_WINDOW_EXCEEDED_REASON};
-pub use runtime::config::RunConfig;
+pub use runtime::config::{FlushConfig, RunConfig, SetupConfig};
pub use runtime::{RunError, run};
diff --git a/sequencer/src/recovery/detector.rs b/sequencer/src/recovery/detector.rs
index c9a5f11..1bd7c18 100644
--- a/sequencer/src/recovery/detector.rs
+++ b/sequencer/src/recovery/detector.rs
@@ -162,17 +162,6 @@ mod tests {
}
}
- fn make_stale_batch_payload(nonce: u64, safe_block: u64) -> Vec {
- ssz::Encode::as_ssz_bytes(&sequencer_core::batch::Batch {
- nonce,
- frames: vec![sequencer_core::batch::Frame {
- user_ops: Vec::new(),
- safe_block,
- fee_price: 0,
- }],
- })
- }
-
#[tokio::test]
async fn exits_on_shutdown_when_safe() {
let db = temp_db("detector-shutdown");
@@ -217,12 +206,13 @@ mod tests {
.expect("close batch 1");
let protocol = test_protocol();
+ let landed = crate::storage::test_helpers::local_batch_payload(&mut storage, 0);
storage
.append_safe_inputs(
1135,
&[StoredSafeInput {
sender: SENDER_A,
- payload: make_stale_batch_payload(0, 10),
+ payload: landed,
block_number: 20,
}],
SENDER_A,
@@ -272,12 +262,13 @@ mod tests {
.expect("close batch 1");
let protocol = test_protocol();
+ let landed = crate::storage::test_helpers::local_batch_payload(&mut storage, 0);
storage
.append_safe_inputs(
1200,
&[StoredSafeInput {
sender: SENDER_A,
- payload: make_stale_batch_payload(0, 100),
+ payload: landed,
block_number: 200,
}],
SENDER_A,
diff --git a/sequencer/src/recovery/flusher.rs b/sequencer/src/recovery/flusher.rs
index 308086a..e2207af 100644
--- a/sequencer/src/recovery/flusher.rs
+++ b/sequencer/src/recovery/flusher.rs
@@ -19,6 +19,8 @@ use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info};
+use crate::l1::watermark::{StorageWatermarkSink, WalletNonceWatermarkSink};
+
#[derive(Debug, Error)]
pub enum FlushError {
#[error("provider/transport: {0}")]
@@ -92,6 +94,28 @@ fn map_watch_error(err: PendingTransactionError) -> Result {
}
impl MempoolFlusher {
+ /// Build a flusher over `provider` plus the DB-backed watermark sink, run
+ /// the watermark-anchored flush, and return the observed safe block `C`.
+ ///
+ /// This is the flush-acquire core shared by all three keyed-flush sites
+ /// (`setup --recovery`, the runtime danger path, and `flush-mempool`). They
+ /// differ only in where the signing key, submitter address, and watermark
+ /// come from, and in the surrounding error type — so callers resolve those
+ /// (provider creation keeps each site's own error mapping; the returned
+ /// [`FlushError`] maps into `RunError`/`RecoveryError` via the existing
+ /// `From` impls) and this owns the sink build + `flush_and_wait`.
+ pub(crate) async fn flush_to_safe(
+ provider: DynProvider,
+ submitter_address: Address,
+ seconds_per_block: u64,
+ db_path: impl Into,
+ watermark: Option,
+ ) -> Result {
+ let flusher = Self::new(provider, submitter_address, seconds_per_block);
+ let sink = StorageWatermarkSink::new(db_path);
+ flusher.flush_and_wait(watermark, &sink).await
+ }
+
pub fn new(provider: DynProvider, address: Address, seconds_per_block: u64) -> Self {
let (confirmation_timeout, safe_poll_interval) = derive_timeouts(seconds_per_block);
Self {
@@ -102,40 +126,73 @@ impl MempoolFlusher {
}
}
- /// Flush the mempool by submitting no-op transactions for pending nonce
- /// slots, then waiting until every slot is safe.
+ /// Flush the mempool by submitting no-op transactions for unresolved
+ /// nonce slots, then waiting until every slot we ever used is safe.
+ ///
+ /// `watermark` is the persisted wallet-nonce watermark — the highest
+ /// nonce this deployment ever broadcast (review R1a), or `None` if
+ /// nothing was ever broadcast (or no DB survives, the cockroach-recovery
+ /// best-effort case, R1b). The loop runs until
+ ///
+ /// ```text
+ /// pending <= safe && safe >= watermark + 1
+ /// ```
///
- /// The loop runs until `get_transaction_count(Pending) <= get_transaction_count(Safe)`,
- /// meaning every slot has reached safe finality.
+ /// The first conjunct resolves every slot the local node remembers; the
+ /// second is the durable anchor — it refuses to declare victory until
+ /// slot `watermark` is consumed at safe depth, covering zombie txs the
+ /// local node has forgotten but the network may still hold (the F1
+ /// counterexample). It doubles as the post-flush assert from R1a: the
+ /// function cannot return success without it.
///
/// At each iteration:
- /// 1. Submit 0-ETH self-transfers for nonces between `Latest` and `Pending`.
- /// These compete with any batch transactions still in the mempool. If
- /// an original batch wins, that is also success: the slot advanced.
+ /// 1. Submit 0-ETH self-transfers for nonces in
+ /// `[Latest, max(Pending, watermark + 1))` — every slot not yet
+ /// mined that we might ever have used. The sink raises the persisted
+ /// watermark before the broadcast (uniform write-before-broadcast,
+ /// though no-ops never exceed the existing watermark under the
+ /// invariant). These compete with any of our txs still in the
+ /// network; whichever wins, the slot advances.
/// 2. Watch each submitted no-op for L1 inclusion.
/// 3. Sleep to let the safe head advance, then re-check the loop condition.
/// 4. If any watch times out, retry the outer loop (tx may have been dropped,
/// or the original batch may be making progress instead).
- pub async fn flush_and_wait(&self) -> Result<(), FlushError> {
+ ///
+ /// Returns the L1 **safe block number** at which resolution was observed
+ /// (review F2): the caller must not cascade until its own re-synced view
+ /// reaches at least this block.
+ pub async fn flush_and_wait(
+ &self,
+ watermark: Option,
+ sink: &dyn WalletNonceWatermarkSink,
+ ) -> Result {
let mut attempt = 0u32;
loop {
let safe_nonce = self.nonce_at(BlockNumberOrTag::Safe).await?;
let pending_nonce = self.nonce_at(BlockNumberOrTag::Pending).await?;
+ // The durable anchor: every slot we ever used must be consumed
+ // at safe depth, regardless of what the local pool remembers.
+ let required_safe_nonce = watermark.map_or(0, |w| w.saturating_add(1));
- if pending_nonce <= safe_nonce {
+ if pending_nonce <= safe_nonce && safe_nonce >= required_safe_nonce {
+ let safe_block = self.safe_block_number().await?;
info!(
safe_nonce,
+ required_safe_nonce,
+ safe_block,
"mempool flush complete — all slots reached safe finality"
);
- return Ok(());
+ return Ok(safe_block);
}
- let unresolved = pending_nonce - safe_nonce;
+ let flush_end = pending_nonce.max(required_safe_nonce);
+ let unresolved = flush_end.saturating_sub(safe_nonce);
if attempt == 0 {
info!(
safe_nonce,
pending_nonce,
+ required_safe_nonce,
unresolved,
"flushing mempool: submitting no-ops for unresolved w_nonce slots"
);
@@ -146,17 +203,26 @@ impl MempoolFlusher {
attempt,
safe_nonce,
pending_nonce,
+ required_safe_nonce,
unresolved,
"flush retry: previous attempt timed out, resubmitting"
);
}
attempt += 1;
- // Submit no-ops for nonces between Latest and Pending. We submit
- // the full range before watching any tx, so every unresolved slot
- // gets a competing no-op attempt in this pass.
+ // Submit no-ops for every not-yet-mined slot up to the flush end.
+ // The full range goes out before watching any tx, so every
+ // unresolved slot gets a competing no-op attempt in this pass.
let latest_nonce = self.nonce_at(BlockNumberOrTag::Latest).await?;
- let tx_hashes = self.submit_noops(latest_nonce, pending_nonce).await?;
+ if latest_nonce < flush_end {
+ // Uniform write-before-broadcast; covers the no-ops we are
+ // about to send (a no-op above the current watermark can only
+ // happen if someone else used our key — over-covering then is
+ // exactly right).
+ sink.raise_to(flush_end.saturating_sub(1))
+ .map_err(FlushError::Provider)?;
+ }
+ let tx_hashes = self.submit_noops(latest_nonce, flush_end).await?;
// Watch each submitted tx for L1 inclusion.
if !self.watch_txs(&tx_hashes).await? {
@@ -168,6 +234,17 @@ impl MempoolFlusher {
}
}
+ /// Block number of the current L1 safe head.
+ async fn safe_block_number(&self) -> Result {
+ let block = self
+ .provider
+ .get_block_by_number(BlockNumberOrTag::Safe)
+ .await
+ .map_err(|e| FlushError::Provider(e.to_string()))?
+ .ok_or_else(|| FlushError::Provider("no safe block available".to_string()))?;
+ Ok(block.header.number)
+ }
+
/// Submit 0-ETH self-transfers for nonces `from_nonce..to_nonce`.
/// Returns the tx hashes of successfully submitted transactions.
async fn submit_noops(&self, from_nonce: u64, to_nonce: u64) -> Result, FlushError> {
@@ -283,6 +360,14 @@ mod tests {
use alloy::node_bindings::Anvil;
use alloy::providers::Provider;
+ /// Sink for tests that don't assert on watermark raises.
+ struct NoopWatermarkSink;
+ impl WalletNonceWatermarkSink for NoopWatermarkSink {
+ fn raise_to(&self, _highest: u64) -> Result<(), String> {
+ Ok(())
+ }
+ }
+
// ── H5: replacement-fee bump keeps no-ops competitive ─────────
#[test]
@@ -469,7 +554,10 @@ mod tests {
let flusher = MempoolFlusher::new(provider, addr, 12);
// No pending txs — should return immediately.
- flusher.flush_and_wait().await.expect("flush");
+ flusher
+ .flush_and_wait(None, &NoopWatermarkSink)
+ .await
+ .expect("flush");
}
#[tokio::test]
@@ -506,10 +594,13 @@ mod tests {
// Run the flusher — it should resolve all 3 nonces to safe.
let flusher = MempoolFlusher::new(provider.clone(), addr, 12)
.with_timeouts(Duration::from_secs(5), Duration::from_millis(200));
- tokio::time::timeout(Duration::from_secs(10), flusher.flush_and_wait())
- .await
- .expect("flush should complete within timeout")
- .expect("flush should succeed");
+ tokio::time::timeout(
+ Duration::from_secs(10),
+ flusher.flush_and_wait(None, &NoopWatermarkSink),
+ )
+ .await
+ .expect("flush should complete within timeout")
+ .expect("flush should succeed");
// Verify: safe nonce caught up.
let safe_after = provider
@@ -523,6 +614,46 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn flush_covers_watermark_slots_the_pool_forgot() {
+ require_anvil();
+
+ let anvil = spawn_anvil();
+ let provider = signer_provider(&anvil);
+ let addr = anvil.addresses()[0];
+
+ // Models the F1 zombie: the persisted watermark says slot 0 was
+ // broadcast, but the local pool has no memory of it
+ // (pending == safe == 0). The pre-R1a `pending <= safe` early
+ // return would declare victory immediately and leave the slot to
+ // a zombie; the anchored flush must consume slot 0 with a no-op
+ // and wait for it to reach safe depth.
+ let _miner = start_miner(provider.clone(), Duration::from_millis(100));
+ let flusher = MempoolFlusher::new(provider.clone(), addr, 12)
+ .with_timeouts(Duration::from_secs(5), Duration::from_millis(200));
+ let observed_safe_block = tokio::time::timeout(
+ Duration::from_secs(10),
+ flusher.flush_and_wait(Some(0), &NoopWatermarkSink),
+ )
+ .await
+ .expect("anchored flush should complete within timeout")
+ .expect("anchored flush should succeed");
+
+ let safe_after = provider
+ .get_transaction_count(addr)
+ .block_id(BlockNumberOrTag::Safe.into())
+ .await
+ .expect("safe nonce after flush");
+ assert!(
+ safe_after >= 1,
+ "watermark slot 0 must be consumed at safe depth, got {safe_after}"
+ );
+ assert!(
+ observed_safe_block > 0,
+ "flush must report the safe block it observed resolution at (F2)"
+ );
+ }
+
#[tokio::test]
async fn flush_handles_already_mined_but_not_safe() {
require_anvil();
@@ -560,10 +691,13 @@ mod tests {
// Flusher should wait for safe finality (no new txs to submit).
let flusher = MempoolFlusher::new(provider.clone(), addr, 12)
.with_timeouts(Duration::from_secs(5), Duration::from_millis(200));
- tokio::time::timeout(Duration::from_secs(10), flusher.flush_and_wait())
- .await
- .expect("flush should complete within timeout")
- .expect("flush should succeed");
+ tokio::time::timeout(
+ Duration::from_secs(10),
+ flusher.flush_and_wait(None, &NoopWatermarkSink),
+ )
+ .await
+ .expect("flush should complete within timeout")
+ .expect("flush should succeed");
let safe_after = provider
.get_transaction_count(addr)
@@ -638,10 +772,13 @@ mod tests {
// `flush_and_wait` must fail fast (no internal retry loop). Wrap in
// a generous outer timeout just to bound test flakiness if alloy's
// HTTP client has small internal retries.
- let err = tokio::time::timeout(Duration::from_secs(5), flusher.flush_and_wait())
- .await
- .expect("flush_and_wait must not hang under disconnect")
- .expect_err("flush_and_wait must surface a Provider error under disconnect");
+ let err = tokio::time::timeout(
+ Duration::from_secs(5),
+ flusher.flush_and_wait(None, &NoopWatermarkSink),
+ )
+ .await
+ .expect("flush_and_wait must not hang under disconnect")
+ .expect_err("flush_and_wait must surface a Provider error under disconnect");
assert!(
matches!(err, FlushError::Provider(_)),
"expected FlushError::Provider, got: {err:?}",
@@ -658,10 +795,13 @@ mod tests {
// fee no-op (or let the original land), wait for safe, and return.
let flusher_after = MempoolFlusher::new(proxied_provider, addr, 12)
.with_timeouts(Duration::from_secs(5), Duration::from_millis(200));
- tokio::time::timeout(Duration::from_secs(15), flusher_after.flush_and_wait())
- .await
- .expect("flush_and_wait should complete after reconnect")
- .expect("flush should succeed once the provider is reachable");
+ tokio::time::timeout(
+ Duration::from_secs(15),
+ flusher_after.flush_and_wait(None, &NoopWatermarkSink),
+ )
+ .await
+ .expect("flush_and_wait should complete after reconnect")
+ .expect("flush should succeed once the provider is reachable");
// Forward progress: the nonce-0 slot was consumed (either by the
// flusher's no-op or by the original tx landing). `safe_nonce` is
diff --git a/sequencer/src/recovery/mod.rs b/sequencer/src/recovery/mod.rs
index 8350dcc..c7a4139 100644
--- a/sequencer/src/recovery/mod.rs
+++ b/sequencer/src/recovery/mod.rs
@@ -50,7 +50,7 @@ use crate::l1::reader::{InputReader, InputReaderError};
use crate::runtime::config::L1Config;
use crate::storage::{self, DangerStatus, StorageOpenError};
pub use detector::{DangerDetector, DangerDetectorError, DetectorExit};
-pub use flusher::MempoolFlusher;
+pub use flusher::{FlushError, MempoolFlusher};
use sequencer_core::protocol::ProtocolTiming;
#[derive(Debug, Error)]
@@ -65,8 +65,38 @@ pub enum RecoveryError {
InputReader(#[from] InputReaderError),
#[error("provider: {0}")]
Provider(String),
+ #[error("recovery flush chain-id mismatch: rpc {rpc} != pinned {expected}")]
+ ChainIdMismatch { rpc: u64, expected: u64 },
#[error("startup refused: {0:?}")]
Refuse(RefuseReason),
+ #[error(
+ "post-flush re-sync reached safe block {resynced_safe_block}, behind the \
+ flusher's observed resolution at {flush_observed_safe_block}; refusing to \
+ cascade on a lagging L1 view (respawn retries with a fresher view)"
+ )]
+ ResyncBehindFlushView {
+ resynced_safe_block: u64,
+ flush_observed_safe_block: u64,
+ },
+}
+
+/// F2 coherence guard: refuse if the post-flush re-sync's safe head lags the
+/// block the flusher observed resolution at. Folding (`setup --recovery`) or
+/// cascading (runtime danger) on a view that stops short of the flush's
+/// resolution would miss inputs the flush already settled. Shared by both
+/// recovery paths; the orchestrator respawn retries with a fresher L1 view (the
+/// flush is idempotent). See [`RecoveryError::ResyncBehindFlushView`].
+pub(crate) fn assert_resync_caught_up(
+ resynced_safe_block: u64,
+ flush_observed_safe_block: u64,
+) -> Result<(), RecoveryError> {
+ if resynced_safe_block < flush_observed_safe_block {
+ return Err(RecoveryError::ResyncBehindFlushView {
+ resynced_safe_block,
+ flush_observed_safe_block,
+ });
+ }
+ Ok(())
}
/// Why startup cannot proceed safely.
@@ -75,6 +105,13 @@ pub enum RecoveryError {
/// startup unsafe. The operator sees the variant in logs and must intervene.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RefuseReason {
+ /// A fully-accepted L1 landing failed the content-identity check
+ /// (review R2): canonical state diverged from the local batch tree.
+ /// Terminal — no restart self-heals it; the operator must run cockroach
+ /// recovery (wipe + rebuild from L1). Standard recovery is forbidden:
+ /// it reconciles the tree's *shape* assuming accepted nonce N is our
+ /// batch N, which is exactly what no longer holds.
+ CanonicalDivergence { nonce: u64 },
/// The L1 safe block timestamp is too old or unknown, so the local L1 view
/// is not usable for recovery or continued soft confirmations.
L1ViewStale,
@@ -124,29 +161,15 @@ impl StartupAction {
}
}
-fn danger_status_label(danger: DangerStatus) -> &'static str {
- match danger {
- DangerStatus::Safe => "safe",
- DangerStatus::L1ViewStale => "l1_view_stale",
- DangerStatus::ClosedBatchInDanger(_) => "closed_batch_in_danger",
- DangerStatus::TipInDanger(_) => "tip_in_danger",
- DangerStatus::EstimatedBatchInDanger(_) => "estimated_batch_in_danger",
- }
-}
-
-fn danger_batch_index(danger: DangerStatus) -> Option {
- match danger {
- DangerStatus::ClosedBatchInDanger(batch_index)
- | DangerStatus::TipInDanger(batch_index)
- | DangerStatus::EstimatedBatchInDanger(batch_index) => Some(batch_index),
- DangerStatus::Safe | DangerStatus::L1ViewStale => None,
- }
-}
-
-fn refuse_reason_label(reason: RefuseReason) -> &'static str {
- match reason {
- RefuseReason::L1ViewStale => "l1_view_stale",
- RefuseReason::EstimatedBatchInDanger { .. } => "estimated_batch_in_danger",
+impl RefuseReason {
+ /// Stable label for logs/metrics. Inherent method, co-located with the
+ /// variants (`DangerStatus::label`/`batch_index` live next to that enum).
+ fn label(self) -> &'static str {
+ match self {
+ RefuseReason::CanonicalDivergence { .. } => "canonical_divergence",
+ RefuseReason::L1ViewStale => "l1_view_stale",
+ RefuseReason::EstimatedBatchInDanger { .. } => "estimated_batch_in_danger",
+ }
}
}
@@ -156,6 +179,9 @@ fn refuse_reason_label(reason: RefuseReason) -> &'static str {
pub fn decide_startup_action(danger: DangerStatus) -> StartupAction {
match danger {
DangerStatus::Safe => StartupAction::Proceed,
+ DangerStatus::CanonicalDivergence(nonce) => {
+ StartupAction::Refuse(RefuseReason::CanonicalDivergence { nonce })
+ }
DangerStatus::ClosedBatchInDanger(batch_index) => {
StartupAction::FlushAndCascade { batch_index }
}
@@ -213,8 +239,8 @@ pub async fn run_preemptive_recovery(
};
let action = decide_startup_action(danger);
tracing::info!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
l1_reachable,
danger_threshold = protocol.danger_threshold(),
@@ -242,8 +268,8 @@ pub async fn run_preemptive_recovery(
let invalidated = match action {
StartupAction::Proceed => {
tracing::info!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
"no danger zone detected — proceeding without recovery"
);
@@ -257,8 +283,8 @@ pub async fn run_preemptive_recovery(
}
StartupAction::RecoverTip { batch_index } => {
tracing::error!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
tip_batch_index = batch_index,
danger_threshold = protocol.danger_threshold(),
@@ -269,8 +295,8 @@ pub async fn run_preemptive_recovery(
}
StartupAction::FlushAndCascade { batch_index } => {
tracing::error!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
batch_index,
danger_threshold = protocol.danger_threshold(),
@@ -281,11 +307,11 @@ pub async fn run_preemptive_recovery(
}
StartupAction::Refuse(reason) => {
tracing::error!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
?reason,
- refuse_reason = refuse_reason_label(reason),
+ refuse_reason = reason.label(),
l1_reachable,
"startup refused: cannot recover safely"
);
@@ -295,8 +321,8 @@ pub async fn run_preemptive_recovery(
if invalidated.is_empty() {
tracing::info!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
invalidated_count = 0,
"startup recovery complete — no batches invalidated"
@@ -307,8 +333,8 @@ pub async fn run_preemptive_recovery(
// log already alerted the operator at error level; this completes
// that incident with a non-error outcome.
tracing::warn!(
- danger_status = danger_status_label(danger),
- danger_batch_index = ?danger_batch_index(danger),
+ danger_status = danger.label(),
+ danger_batch_index = ?danger.batch_index(),
startup_action = action.label(),
invalidated_count = invalidated.len(),
batches = ?invalidated,
@@ -332,17 +358,41 @@ async fn run_flush_and_cascade(
l1_config: &L1Config,
protocol: &ProtocolTiming,
) -> Result, RecoveryError> {
- let flush_provider = crate::l1::provider::create_signer_provider(
+ // Keyed-write chain-id gate (review): the flush signs L1 no-op txs, so it
+ // must confirm the RPC still serves the pinned chain *immediately before
+ // signing*. The boot-time `validate_rpc_chain_id` and the reader's one-shot
+ // `verify_chain_id` are both stale by now (a load-balanced RPC could have
+ // failed over to another chain since), so neither is a sufficient backstop
+ // for a fresh keyed write. `create_verified_signer_provider` folds the check
+ // into the signer build so this path cannot skip it. A mismatch is terminal
+ // (operator misconfig); an RPC error is retryable (handled like `Provider`).
+ let flush_provider = crate::l1::provider::create_verified_signer_provider(
&l1_config.eth_rpc_url,
&l1_config.batch_submitter_private_key,
+ l1_config.chain_id,
)
- .map_err(|e| RecoveryError::Provider(e.to_string()))?;
- let flusher = MempoolFlusher::new(
+ .await
+ .map_err(|e| match e {
+ crate::l1::provider::VerifiedSignerProviderError::ChainIdMismatch { rpc, expected } => {
+ RecoveryError::ChainIdMismatch { rpc, expected }
+ }
+ other => RecoveryError::Provider(other.to_string()),
+ })?;
+ // The persisted watermark anchors the flush: every slot this deployment
+ // ever broadcast must resolve at safe depth, regardless of what the
+ // local node's pool remembers (review R1a / F1).
+ let watermark = {
+ let mut storage = storage::Storage::open(db_path)?;
+ storage.wallet_nonce_watermark()?
+ };
+ let flush_observed_safe_block = MempoolFlusher::flush_to_safe(
flush_provider,
l1_config.batch_submitter_address,
protocol.seconds_per_block,
- );
- flusher.flush_and_wait().await?;
+ db_path,
+ watermark,
+ )
+ .await?;
// If this re-sync errors out, L1 has been flushed but the DB has NOT been
// cascaded — we exit with the InputReaderError and rely on the orchestrator
@@ -351,10 +401,13 @@ async fn run_flush_and_cascade(
// - `flush_and_wait` is idempotent: on the next attempt it queries L1 for
// pending wallet-nonces, finds zero (the previous flush cleared them),
// and returns immediately.
- // - `check_danger` is stable across the failure window: safe_block only
- // moves forward and flush doesn't retroactively change closed batches'
- // `first_frame_safe_block`, so the danger condition that fired before
- // still fires after the restart.
+ // - `check_danger` re-decides on the post-flush state. Two cases: the
+ // danger persists (the restart re-enters this same path — flush is a
+ // no-op the second time), or the original danger resolved during the
+ // flush (e.g. the frontier batch landed gold), in which case the
+ // restart proceeds normally with any no-op'd Pending batch left valid —
+ // safe, since it simply resubmits at a fresh slot with no poisoned
+ // ancestor.
// - `recover_post_flush` is idempotent against the resulting DB state
// (verified by `after_post_recovery_crash_is_no_op` in `recovery_tests`).
//
@@ -368,6 +421,17 @@ async fn run_flush_and_cascade(
tracing::info!("running post-flush recovery (cascade non-gold suffix)");
let mut storage = storage::Storage::open(db_path)?;
+
+ // Coherence check (review F2): the cascade's precondition is that the
+ // gold frontier reflects at least the safe view the flusher observed
+ // resolution at. Behind a load-balanced RPC, the reader's re-sync can be
+ // served by a replica lagging the flusher's view — cascading then could
+ // invalidate a batch the scheduler actually accepted and reuse its
+ // nonce. Refuse instead; the orchestrator respawn retries with a
+ // fresher view (the flush is idempotent).
+ let resynced_safe_block = storage.current_safe_block()?.unwrap_or(0);
+ assert_resync_caught_up(resynced_safe_block, flush_observed_safe_block)?;
+
Ok(storage.recover_post_flush(protocol.danger_threshold())?)
}
@@ -383,6 +447,17 @@ mod tests {
);
}
+ #[test]
+ fn refuse_on_canonical_divergence() {
+ // Terminal refusal — never `Proceed`, never a flush+cascade on top
+ // of a diverged frontier (review R2). The remedy is cockroach
+ // recovery, outside this dispatch entirely.
+ assert_eq!(
+ decide_startup_action(DangerStatus::CanonicalDivergence(7)),
+ StartupAction::Refuse(RefuseReason::CanonicalDivergence { nonce: 7 })
+ );
+ }
+
#[test]
fn flush_and_cascade_on_closed_batch_in_danger() {
assert_eq!(
diff --git a/sequencer/src/runtime/config.rs b/sequencer/src/runtime/config.rs
index 8fc55b2..52df617 100644
--- a/sequencer/src/runtime/config.rs
+++ b/sequencer/src/runtime/config.rs
@@ -1,9 +1,22 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)
+//! CLI configuration for the three subcommands (`setup`, `run`,
+//! `flush-mempool`).
+//!
+//! The phase split: `setup` is L1-read-only and establishes the
+//! timeless deployment identity + initial sync + genesis snapshot; it takes
+//! the batch-submitter **address** but never the signing key. `run` boots
+//! from an already-set-up DB, reads identity from the DB (so chain id / app
+//! address are not CLI args here), and keeps the signing **key** because it
+//! submits. `flush-mempool` is a keyed operator tool that settles the wallet
+//! nonce on demand.
+//!
+//! Shared, drift-prone arg groups (`TimingArgs`, `KeyArgs`) are flattened into
+//! the per-subcommand configs so defaults and env-var names stay consistent.
+
use alloy_primitives::Address;
-use alloy_sol_types::Eip712Domain;
-use clap::{ArgGroup, Parser};
+use clap::{ArgGroup, Args};
use sequencer_core::protocol::{ProtocolTiming, ProtocolTimingError};
const DEFAULT_HTTP_ADDR: &str = "127.0.0.1:3000";
@@ -12,9 +25,9 @@ const DB_FILENAME: &str = "sequencer.db";
/// Shared L1 / InputBox configuration used by both the input reader and the batch submitter.
///
-/// Built once at startup from `RunConfig` plus the discovered InputBox address, so RPC URL,
-/// InputBox address, and app address are defined in a single place and not duplicated across
-/// component configs.
+/// Built once at startup from the pinned deployment identity plus the runtime
+/// `RunConfig`, so RPC URL, InputBox address, and app address are defined in a
+/// single place and not duplicated across component configs.
#[derive(Debug, Clone)]
pub struct L1Config {
pub eth_rpc_url: String,
@@ -22,51 +35,78 @@ pub struct L1Config {
pub app_address: Address,
pub batch_submitter_private_key: String,
pub batch_submitter_address: Address,
+ /// The pinned deployment chain id. Carried here so keyed-write paths (e.g.
+ /// the preemptive-recovery flush) can re-confirm the RPC's chain id right
+ /// before signing via [`crate::l1::provider::create_verified_signer_provider`].
+ pub chain_id: u64,
}
-#[derive(Debug, Clone, Parser)]
-#[command(
- name = "sequencer",
- about = "Deterministic sequencer prototype with low-latency soft confirmations.\n\n\
- All options can also be set via environment variables (shown in brackets).",
- version,
- after_help = "\
-Examples:
- sequencer \\
- --eth-rpc-url http://127.0.0.1:8545 \\
- --chain-id 31337 \\
- --app-address 0x1111111111111111111111111111111111111111 \\
- --batch-submitter-private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
-
- CARTESI_SEQUENCER_BLOCKCHAIN_HTTP_ENDPOINT=http://127.0.0.1:8545 \\
- CARTESI_SEQUENCER_BLOCKCHAIN_ID=31337 \\
- CARTESI_SEQUENCER_APP_ADDRESS=0x1111111111111111111111111111111111111111 \\
- CARTESI_SEQUENCER_AUTH_PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 \\
- sequencer\
-",
- group(
- ArgGroup::new("batch_submitter_key_source")
- .args(&["batch_submitter_private_key", "batch_submitter_private_key_file"])
- .required(true)
- .multiple(false)
- )
-)]
-pub struct RunConfig {
- #[arg(long, env = "CARTESI_SEQUENCER_HTTP_ADDR", default_value = DEFAULT_HTTP_ADDR, value_parser = parse_non_empty_string)]
- pub http_addr: String,
- #[arg(long, env = "CARTESI_SEQUENCER_DATA_DIR", default_value = DEFAULT_DATA_DIR, value_parser = parse_non_empty_string)]
- pub data_dir: String,
- #[arg(long, env = "CARTESI_SEQUENCER_BLOCKCHAIN_HTTP_ENDPOINT", value_parser = parse_non_empty_string)]
- pub eth_rpc_url: String,
- /// Error codes that trigger `get_logs` retries with a shorter block range.
- #[arg(long, env = "CARTESI_SEQUENCER_LONG_BLOCK_RANGE_ERROR_CODES", value_delimiter = ',', default_values = crate::l1::partition::DEFAULT_LONG_BLOCK_RANGE_ERROR_CODES)]
- pub long_block_range_error_codes: Vec,
- /// Expected chain ID. Validated against the RPC at startup.
- #[arg(long, env = "CARTESI_SEQUENCER_BLOCKCHAIN_ID")]
- pub chain_id: u64,
- /// Application (EIP-712 verifying contract) address.
- #[arg(long, env = "CARTESI_SEQUENCER_APP_ADDRESS", value_parser = parse_address)]
- pub app_address: Address,
+/// Full path to the SQLite database file inside `data_dir`.
+pub fn db_path_in(data_dir: &str) -> String {
+ std::path::Path::new(data_dir)
+ .join(DB_FILENAME)
+ .to_string_lossy()
+ .into_owned()
+}
+
+/// Protocol-timing tuning knobs shared by `setup` and `run`. `setup` needs a
+/// valid `ProtocolTiming` for the initial-sync writes (and validating it here
+/// fails fast at setup, not just at run).
+#[derive(Debug, Clone, Args)]
+pub struct TimingArgs {
+ /// Blocks before MAX_WAIT_BLOCKS to trigger preemptive recovery.
+ /// The danger threshold is MAX_WAIT_BLOCKS minus this margin.
+ /// Must be less than MAX_WAIT_BLOCKS (validated at startup).
+ ///
+ /// Default 300 (~1h at 12s/block) is sized to give operators meaningful
+ /// runway to investigate before the system gives up on the current
+ /// batches — see `docs/recovery/README.md` "Step 1: Danger threshold".
+ #[arg(
+ long,
+ env = "CARTESI_SEQUENCER_PREEMPTIVE_MARGIN_BLOCKS",
+ default_value = "300"
+ )]
+ pub preemptive_margin_blocks: u64,
+
+ /// Blocks of safe-head age after which the L1 read view is considered too
+ /// stale to trust. Independent of the preemptive margin. Must be strictly
+ /// less than the danger threshold (validated at startup).
+ ///
+ /// Default 600 (~2h at 12s/block).
+ #[arg(long, env = "CARTESI_SEQUENCER_L1_READ_STALE_AFTER_BLOCKS", default_value = "600", value_parser = clap::value_parser!(u64).range(1..))]
+ pub l1_read_stale_after_blocks: u64,
+
+ /// Assumed L1 block time in seconds. Used to estimate block progression
+ /// from wall-clock time when the L1 provider is unreachable.
+ #[arg(long, env = "CARTESI_SEQUENCER_SECONDS_PER_BLOCK", default_value = "12", value_parser = clap::value_parser!(u64).range(1..))]
+ pub seconds_per_block: u64,
+}
+
+impl TimingArgs {
+ /// Build a validated [`ProtocolTiming`]. Pure derivation — no I/O.
+ /// `max_wait_blocks` is the shared scheduler constant; the rest are the
+ /// operator-tunable CLI args.
+ pub fn protocol_timing(&self) -> Result {
+ ProtocolTiming::try_new(
+ sequencer_core::MAX_WAIT_BLOCKS,
+ self.preemptive_margin_blocks,
+ self.l1_read_stale_after_blocks,
+ self.seconds_per_block,
+ )
+ }
+}
+
+/// Batch-submitter signing-key source, shared by `run` and `flush-mempool`
+/// (both sign L1 transactions). `setup` does NOT use this — it takes the
+/// submitter address directly and never signs.
+#[derive(Debug, Clone, Args)]
+#[command(group(
+ ArgGroup::new("batch_submitter_key_source")
+ .args(["batch_submitter_private_key", "batch_submitter_private_key_file"])
+ .required(true)
+ .multiple(false)
+))]
+pub struct KeyArgs {
/// Hex-encoded private key for the batch submitter.
#[arg(
long,
@@ -81,6 +121,206 @@ pub struct RunConfig {
group = "batch_submitter_key_source"
)]
batch_submitter_private_key_file: Option,
+}
+
+impl KeyArgs {
+ /// Resolve the batch submitter private key from either the inline value or a key file.
+ pub fn resolve(&self) -> Result {
+ resolve_key_source(
+ &self.batch_submitter_private_key,
+ &self.batch_submitter_private_key_file,
+ )
+ .map(|opt| opt.expect("batch submitter private key is required by CLI arg group"))
+ }
+}
+
+/// Resolve a batch-submitter key from an inline value or a key file (first line).
+/// Returns `Ok(None)` when neither source is set — `KeyArgs` makes that
+/// impossible via its required arg group, but `setup --recovery` validates the
+/// "exactly one source" rule in code (the group can't be conditionally required
+/// at the clap level), so it needs the `None` case.
+fn resolve_key_source(
+ inline: &Option,
+ file: &Option,
+) -> Result