From 100959684c40cce325b89e6f96b838b4af7960a2 Mon Sep 17 00:00:00 2001 From: Ricardo Guilherme Schmidt <3esmit@gmail.com> Date: Fri, 5 Jun 2026 00:48:20 -0300 Subject: [PATCH 1/2] feat(tests): add local sequencer integration harness --- .github/workflows/ci.yml | 31 + CLAUDE.md | 2 +- Cargo.lock | 146 ++ Cargo.toml | 6 + Makefile | 1 + README.md | 15 +- programs/integration_tests/Cargo.toml | 12 + programs/integration_tests/src/lib.rs | 7 + .../integration_tests/src/local_sequencer.rs | 1447 +++++++++++++++++ programs/integration_tests/tests/amm.rs | 3 +- programs/integration_tests/tests/ata.rs | 3 +- .../integration_tests/tests/stablecoin.rs | 3 +- programs/integration_tests/tests/token.rs | 83 +- tools/local-sequencer/Cargo.toml | 11 + tools/local-sequencer/src/main.rs | 596 +++++++ 15 files changed, 2359 insertions(+), 7 deletions(-) create mode 100644 programs/integration_tests/src/local_sequencer.rs create mode 100644 tools/local-sequencer/Cargo.toml create mode 100644 tools/local-sequencer/src/main.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8473744..c0f58dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,6 +125,37 @@ jobs: env: RISC0_DEV_MODE: 1 + local-sequencer-integration-tests: + name: Local Sequencer Integration Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + - uses: ./.github/actions/install-risc0 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: "1.94.0" + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + ~/.cache/lez-programs/local-sequencer + target + key: ${{ runner.os }}-cargo-local-sequencer-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-local-sequencer- + ${{ runner.os }}-cargo- + + - name: Local sequencer integration tests + run: cargo run -p local-sequencer -- test + env: + RISC0_DEV_MODE: 1 + RUST_TEST_THREADS: 1 + check-idl: name: Check IDL runs-on: ubuntu-latest diff --git a/CLAUDE.md b/CLAUDE.md index 3d6ea9a..1549721 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -105,7 +105,7 @@ programs/ methods/ # Host-side zkVM method embedding methods/guest/ # Guest binary (separate workspace) integration_tests/ - tests/ # End-to-end tests through the zkVM (token, amm, ata) + tests/ # End-to-end tests through the zkVM (token, amm, ata, stablecoin) apps/ amm/ # QML-based UI for the AMM program (Nix flake) ``` diff --git a/Cargo.lock b/Cargo.lock index 9372e45..3a0076d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,6 +373,24 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags 2.11.1", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.117", +] + [[package]] name = "bit-vec" version = "0.8.0" @@ -520,6 +538,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "camino" version = "1.2.2" @@ -559,9 +587,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -608,6 +647,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clock_core" version = "0.1.0" @@ -1234,6 +1284,12 @@ dependencies = [ "wasip3", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "group" version = "0.13.0" @@ -1604,8 +1660,12 @@ dependencies = [ "amm_core", "ata-methods", "ata_core", + "base64", + "borsh", "nssa", "nssa_core", + "rocksdb", + "serde_json", "stablecoin-methods", "stablecoin_core", "token-methods", @@ -1642,6 +1702,16 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.98" @@ -1722,6 +1792,16 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "libm" version = "0.2.16" @@ -1737,6 +1817,30 @@ dependencies = [ "libc", ] +[[package]] +name = "librocksdb-sys" +version = "0.17.3+10.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef2a00ee60fe526157c9023edab23943fae1ce2ab6f4abb2a807c1746835de9" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "libc", + "libz-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bc9657773828b90eeb625adff10eeac83cc21bbfd8e23a03eaa8a33c9e28d9" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -1749,6 +1853,10 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "local-sequencer" +version = "0.1.0" + [[package]] name = "log" version = "0.4.29" @@ -1814,6 +1922,12 @@ dependencies = [ "paste", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "1.2.0" @@ -1831,6 +1945,16 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5b0c77c1b780822bc749a33e39aeb2c07584ab93332303babeabb645298a76e" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nssa" version = "0.1.0" @@ -2022,6 +2146,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "postcard" version = "1.1.3" @@ -2619,6 +2749,16 @@ dependencies = [ "stability", ] +[[package]] +name = "rocksdb" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb7af00d2b17dbd07d82c0063e25411959748ff03e8d4f96134c2ff41fce34f" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rrs-lib" version = "0.1.0" @@ -3552,6 +3692,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 7d68ac5..40aa6b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "programs/stablecoin/methods", "programs/integration_tests", "tools/idl-gen", + "tools/local-sequencer", ] exclude = [ "programs/token/methods/guest", @@ -45,6 +46,11 @@ borsh = { version = "1.5", features = ["derive"] } risc0-zkvm = { version = "=3.0.5" } serde_json = "1.0" tokio = { version = "1.28.2", features = ["net", "rt-multi-thread", "sync", "macros"] } +base64 = "0.22" +rocksdb = { version = "0.24.0", default-features = false, features = [ + "snappy", + "bindgen-runtime", +] } [workspace.lints.rust] rust_2018_idioms = { level = "deny", priority = -1 } diff --git a/Makefile b/Makefile index 59e30c7..ee576db 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ clippy: RISC0_SKIP_BUILD=1 cargo clippy --workspace --all-targets -- -D warnings + RISC0_SKIP_BUILD=1 cargo clippy -p integration_tests --all-targets --features local-sequencer-tests -- -D warnings clippy-guest: for manifest in programs/*/methods/guest/Cargo.toml; do \ diff --git a/README.md b/README.md index 91a73ec..8df917b 100644 --- a/README.md +++ b/README.md @@ -67,17 +67,28 @@ RISC0_DEV_MODE=1 cargo test -p token_program -p amm_program -p ata_program -p st # Run integration tests (dev mode skips ZK proof generation) RISC0_DEV_MODE=1 cargo test -p integration_tests +# Clone/build the pinned standalone LEZ sequencer, then run integration tests through it +cargo run -p local-sequencer -- test + +# Or reuse an existing pinned LEZ checkout with the standalone sequencer already built +LEZ_LOCAL_SEQUENCER_REPO=/path/to/logos-execution-zone RISC0_DEV_MODE=1 RUST_TEST_THREADS=1 cargo test -p integration_tests --features local-sequencer-tests -- --nocapture + # Run all tests make test ``` -Integration tests live in `programs/integration_tests/tests/` and cover `token`, `amm`, and `ata` programs end-to-end through the zkVM using `RISC0_DEV_MODE=1` to skip proof generation. Each test file corresponds to a program: +The local-sequencer feature mirrors every transition through a spawned sequencer: each transition is submitted to the sequencer and its commit-or-reject outcome is asserted to match the in-process result. Account and proof *equality* are additionally checked when tests call `get_account_by_id` or `get_proof_for_commitment`; force-insert-only setup and tests that never read state still mirror their transitions but skip the equality comparison. Each `TestState` spawns its own sequencer process, so `cargo run -p local-sequencer -- test` defaults to `RUST_TEST_THREADS=1` (export a different value to override); when invoking `cargo test` directly, set `RUST_TEST_THREADS=1` on constrained machines or when serial execution is preferred. + +`LEZ_LOCAL_SEQUENCER_REPO` may point at any clean LEZ checkout whose `HEAD` matches the pinned ref; its git origin does not need to match this workspace's Cargo source. `LEZ_LOCAL_SEQUENCER_SOURCE` only applies when the helper creates or updates its managed cache checkout. Set `LEZ_LOCAL_SEQUENCER_REF` when using a nonstandard or vendored manifest where the pinned tag cannot be parsed. + +Integration tests live in `programs/integration_tests/tests/` and cover `token`, `amm`, `ata`, and `stablecoin` programs end-to-end through the zkVM using `RISC0_DEV_MODE=1` to skip proof generation. Each test file corresponds to a program: - `programs/integration_tests/tests/token.rs` - `programs/integration_tests/tests/amm.rs` - `programs/integration_tests/tests/ata.rs` +- `programs/integration_tests/tests/stablecoin.rs` -`stablecoin` and `twap_oracle` are tested via their own unit tests (`cargo test -p stablecoin_program -p twap_oracle_program`). +`twap_oracle` is tested via its own unit tests (`cargo test -p twap_oracle_program`). ## Compile Guest Binaries diff --git a/programs/integration_tests/Cargo.toml b/programs/integration_tests/Cargo.toml index f2374fd..51bbab9 100644 --- a/programs/integration_tests/Cargo.toml +++ b/programs/integration_tests/Cargo.toml @@ -6,9 +6,20 @@ edition = "2021" [lints] workspace = true +[features] +local-sequencer-tests = [ + "dep:base64", + "dep:borsh", + "dep:rocksdb", + "dep:serde_json", +] + [dependencies] nssa = { workspace = true } nssa_core = { workspace = true, features = ["host"] } +base64 = { workspace = true, optional = true } +borsh = { workspace = true, optional = true } +rocksdb = { workspace = true, optional = true } amm_core = { workspace = true } token_core = { workspace = true } ata_core = { workspace = true } @@ -17,3 +28,4 @@ token-methods = { path = "../token/methods" } amm-methods = { path = "../amm/methods" } ata-methods = { path = "../ata/methods" } stablecoin-methods = { path = "../stablecoin/methods" } +serde_json = { workspace = true, optional = true } diff --git a/programs/integration_tests/src/lib.rs b/programs/integration_tests/src/lib.rs index 8b13789..36fbf63 100644 --- a/programs/integration_tests/src/lib.rs +++ b/programs/integration_tests/src/lib.rs @@ -1 +1,8 @@ +#[cfg(feature = "local-sequencer-tests")] +mod local_sequencer; +#[cfg(feature = "local-sequencer-tests")] +pub use local_sequencer::TestState; + +#[cfg(not(feature = "local-sequencer-tests"))] +pub type TestState = nssa::V03State; diff --git a/programs/integration_tests/src/local_sequencer.rs b/programs/integration_tests/src/local_sequencer.rs new file mode 100644 index 0000000..f8db4e3 --- /dev/null +++ b/programs/integration_tests/src/local_sequencer.rs @@ -0,0 +1,1447 @@ +use std::{ + env, + error::Error, + fmt::{self, Write as _}, + fs, + io::{self, Read as _, Write as _}, + net::{TcpListener, TcpStream}, + path::{Path, PathBuf}, + process::{Child, Command, Stdio}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine as _}; +use borsh::{BorshDeserialize, BorshSerialize}; +use nssa::{ + error::NssaError, + privacy_preserving_transaction::PrivacyPreservingTransaction, + program_deployment_transaction::ProgramDeploymentTransaction, + public_transaction::{self, PublicTransaction}, +}; +use nssa_core::{ + account::{Account, AccountId}, + BlockId, Commitment, MembershipProof, Nullifier, Timestamp, +}; +use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options}; +use serde_json::{json, Value}; + +type DynError = Box; +type DynResult = Result; + +const JSON_RPC_INVALID_PARAMS_CODE: i64 = -32602; +const LOCAL_SEQUENCER_REPO_ENV: &str = "LEZ_LOCAL_SEQUENCER_REPO"; +const EXPECTED_LEZ_COMMIT: &str = "cf3639d8252040d13b3d4e933feb19b42c76e14a"; +const LEZ_SEQUENCER_BIN_REL_PATH: &str = "target/release/sequencer_service"; +const LEZ_SEQUENCER_CONFIG_CANDIDATES: &[&str] = &[ + "lez/sequencer/service/configs/debug/sequencer_config.json", + "sequencer/service/configs/debug/sequencer_config.json", +]; +const CF_BLOCK_NAME: &str = "cf_block"; +const CF_META_NAME: &str = "cf_meta"; +const CF_NSSA_STATE_NAME: &str = "cf_nssa_state"; +const DB_NSSA_STATE_KEY: &str = "nssa_state"; +const POLL_INTERVAL: Duration = Duration::from_millis(100); +const SPAWN_ATTEMPTS: usize = 3; +const CLOCK_READ_SYNC_ATTEMPTS: usize = 5; +const COMMIT_TIMEOUT: Duration = Duration::from_secs(20); +const SUBMIT_TIMEOUT: Duration = Duration::from_secs(5); +const HEALTH_TIMEOUT: Duration = Duration::from_secs(30); +const RPC_CALL_TIMEOUT: Duration = Duration::from_secs(10); +const MAX_RPC_RESPONSE_BYTES: usize = 16 * 1024 * 1024; + +pub struct TestState { + inner: nssa::V03State, + sequencer: Option, + dirty: bool, + last_replayed_block_id: BlockId, +} + +impl TestState { + #[must_use] + pub fn new_with_genesis_accounts( + public_accounts: &[(AccountId, u128)], + private_accounts: Vec<(Commitment, Nullifier)>, + genesis_timestamp: Timestamp, + ) -> Self { + Self { + inner: nssa::V03State::new_with_genesis_accounts( + public_accounts, + private_accounts, + genesis_timestamp, + ), + sequencer: None, + dirty: true, + last_replayed_block_id: 0, + } + } + + pub fn transition_from_public_transaction( + &mut self, + tx: &PublicTransaction, + block_id: BlockId, + timestamp: Timestamp, + ) -> Result<(), NssaError> { + let rpc_tx = RpcTransaction::Public(Box::new(tx.clone())); + match self.mirror_transaction(&rpc_tx) { + MirrorOutcome::Committed(context) => { + let mut actual_inner = self.inner.clone(); + self.replay_prior_clock_transactions(&mut actual_inner, context); + actual_inner + .transition_from_public_transaction(tx, context.block_id, context.timestamp) + .unwrap_or_else(|err| { + panic!( + "local replay rejected public transaction committed by sequencer at \ + block {}: {err}", + context.block_id + ) + }); + apply_clock_transaction(&mut actual_inner, context); + self.inner = actual_inner; + self.last_replayed_block_id = context.block_id; + self.assert_affected_accounts_match(&rpc_tx); + Ok(()) + } + MirrorOutcome::NotCommitted(rejection) => { + let mut expected_inner = self.inner.clone(); + let expected = if let Some(context) = rejection.validation_context { + self.replay_prior_clock_transactions(&mut expected_inner, context); + expected_inner.transition_from_public_transaction( + tx, + context.block_id, + context.timestamp, + ) + } else { + expected_inner.transition_from_public_transaction(tx, block_id, timestamp) + }; + + if expected.is_ok() { + panic!("local replay accepted public transaction dropped by sequencer"); + } + self.advance_rejected_clock_state(&rejection); + expected + } + } + } + + pub fn transition_from_privacy_preserving_transaction( + &mut self, + tx: &PrivacyPreservingTransaction, + block_id: BlockId, + timestamp: Timestamp, + ) -> Result<(), NssaError> { + let rpc_tx = RpcTransaction::PrivacyPreserving(Box::new(tx.clone())); + match self.mirror_transaction(&rpc_tx) { + MirrorOutcome::Committed(context) => { + let mut actual_inner = self.inner.clone(); + self.replay_prior_clock_transactions(&mut actual_inner, context); + actual_inner + .transition_from_privacy_preserving_transaction( + tx, + context.block_id, + context.timestamp, + ) + .unwrap_or_else(|err| { + panic!( + "local replay rejected privacy-preserving transaction committed by \ + sequencer at block {}: {err}", + context.block_id + ) + }); + apply_clock_transaction(&mut actual_inner, context); + self.inner = actual_inner; + self.last_replayed_block_id = context.block_id; + self.assert_affected_accounts_match(&rpc_tx); + Ok(()) + } + MirrorOutcome::NotCommitted(rejection) => { + let mut expected_inner = self.inner.clone(); + let expected = if let Some(context) = rejection.validation_context { + self.replay_prior_clock_transactions(&mut expected_inner, context); + expected_inner.transition_from_privacy_preserving_transaction( + tx, + context.block_id, + context.timestamp, + ) + } else { + expected_inner + .transition_from_privacy_preserving_transaction(tx, block_id, timestamp) + }; + + if expected.is_ok() { + panic!( + "local replay accepted privacy-preserving transaction dropped by sequencer" + ); + } + self.advance_rejected_clock_state(&rejection); + expected + } + } + } + + pub fn transition_from_program_deployment_transaction( + &mut self, + tx: &ProgramDeploymentTransaction, + ) -> Result<(), NssaError> { + let rpc_tx = RpcTransaction::ProgramDeployment(Box::new(tx.clone())); + match self.mirror_transaction(&rpc_tx) { + MirrorOutcome::Committed(context) => { + let mut actual_inner = self.inner.clone(); + self.replay_prior_clock_transactions(&mut actual_inner, context); + actual_inner + .transition_from_program_deployment_transaction(tx) + .unwrap_or_else(|err| { + panic!( + "local replay rejected program deployment committed by sequencer at \ + block {}: {err}", + context.block_id + ) + }); + apply_clock_transaction(&mut actual_inner, context); + self.inner = actual_inner; + self.last_replayed_block_id = context.block_id; + self.assert_affected_accounts_match(&rpc_tx); + Ok(()) + } + MirrorOutcome::NotCommitted(rejection) => { + let mut expected_inner = self.inner.clone(); + let expected = expected_inner.transition_from_program_deployment_transaction(tx); + + if expected.is_ok() { + panic!("local replay accepted program deployment dropped by sequencer"); + } + self.advance_rejected_clock_state(&rejection); + expected + } + } + } + + pub fn force_insert_account(&mut self, account_id: AccountId, account: Account) { + self.inner.force_insert_account(account_id, account); + self.dirty = true; + } + + #[must_use] + pub fn get_account_by_id(&self, account_id: AccountId) -> Account { + let (account, sequencer_account) = self.account_pair_for_read(account_id); + + if let Some(sequencer_account) = sequencer_account { + assert_eq!( + sequencer_account, account, + "local sequencer account state diverged for {account_id}" + ); + } + + account + } + + #[must_use] + pub fn get_proof_for_commitment(&self, commitment: &Commitment) -> Option { + let proof = self.inner.get_proof_for_commitment(commitment); + + if !self.dirty { + if let Some(sequencer) = &self.sequencer { + let sequencer_proof = sequencer + .get_proof_for_commitment(commitment) + .unwrap_or_else(|err| { + panic!("local sequencer getProofForCommitment failed: {err}") + }); + assert_eq!( + sequencer_proof, proof, + "local sequencer commitment proof diverged" + ); + } + } + + proof + } + + fn account_pair_for_read(&self, account_id: AccountId) -> (Account, Option) { + let account = self.inner.get_account_by_id(account_id); + + if self.dirty { + return (account, None); + } + + let Some(sequencer) = &self.sequencer else { + return (account, None); + }; + + if is_clock_account(account_id) { + return self + .clock_account_pair_for_read(sequencer, account_id) + .unwrap_or_else(|err| panic!("local sequencer clock read sync failed: {err}")); + } + + let sequencer_account = sequencer + .get_account_by_id(account_id) + .unwrap_or_else(|err| panic!("local sequencer getAccount failed: {err}")); + (account, Some(sequencer_account)) + } + + // The sequencer advances the clock accounts on every block (~every + // `block_create_timeout`), so a naive "replay to the tip, then read the + // account" can straddle a block boundary and observe an inconsistent pair. + // A reading is only trustworthy inside a quiescent window, i.e. when the + // sequencer's last block id is unchanged across the replay and the account + // read (equal values are always safe to accept regardless of the window). + // If no quiescent window is found within the attempt budget, skip the + // cross-check for this read and return the deterministic in-process value + // rather than asserting on a transient, known-inconsistent pair (which used + // to surface as a spurious "state diverged" panic under load). + fn clock_account_pair_for_read( + &self, + sequencer: &LocalSequencer, + account_id: AccountId, + ) -> DynResult<(Account, Option)> { + for _ in 0..CLOCK_READ_SYNC_ATTEMPTS { + let target_block_id = sequencer.last_block_id()?; + let mut state = self.inner.clone(); + self.replay_clock_transactions_through(&mut state, sequencer, target_block_id)?; + let account = state.get_account_by_id(account_id); + let sequencer_account = sequencer.get_account_by_id(account_id)?; + + if sequencer_account == account || sequencer.last_block_id()? == target_block_id { + return Ok((account, Some(sequencer_account))); + } + } + + Ok((self.inner.get_account_by_id(account_id), None)) + } + + fn mirror_transaction(&mut self, tx: &RpcTransaction) -> MirrorOutcome { + let tx_hash = hex_encode(&tx.hash()) + .unwrap_or_else(|_| String::from("")); + let submitted = match self + .ensure_sequencer() + .submit_transaction_retrying(tx, SUBMIT_TIMEOUT) + { + Ok(submitted) => submitted, + Err(err) if is_transaction_precheck_rejection(err.as_ref()) => { + return MirrorOutcome::NotCommitted(RejectionContext::precheck()); + } + Err(err) => panic!("local sequencer failed to submit {tx_hash}: {err}"), + }; + + match self + .ensure_sequencer() + .wait_for_rejection_or_transaction(&submitted, COMMIT_TIMEOUT) + .unwrap_or_else(|err| { + panic!("local sequencer transaction poll failed for {tx_hash}: {err}") + }) { + SequencerTransactionOutcome::Committed(context) => MirrorOutcome::Committed(context), + SequencerTransactionOutcome::Rejected(rejection) => { + MirrorOutcome::NotCommitted(rejection) + } + } + } + + fn ensure_sequencer(&mut self) -> &mut LocalSequencer { + if self.sequencer.is_none() || self.dirty { + self.sequencer = None; + let sequencer = LocalSequencer::spawn(&self.inner) + .unwrap_or_else(|err| panic!("failed to start local sequencer: {err}")); + let genesis_block_id = sequencer.genesis_block_id; + self.sequencer = Some(sequencer); + self.dirty = false; + self.last_replayed_block_id = genesis_block_id; + } + + match &mut self.sequencer { + Some(sequencer) => sequencer, + None => unreachable!("local sequencer should be initialized"), + } + } + + fn replay_prior_clock_transactions(&self, state: &mut nssa::V03State, committed: BlockContext) { + let Some(sequencer) = &self.sequencer else { + panic!("local sequencer should be initialized before replaying clock transactions"); + }; + + // Empty blocks before the user transaction already advanced the sequencer clock. + // Replay them first so programs reading clock accounts see the same pre-state. + for block_id in self.last_replayed_block_id.saturating_add(1)..committed.block_id { + replay_clock_transaction_for_block(state, sequencer, block_id) + .unwrap_or_else(|err| panic!("failed to replay sequencer block {block_id}: {err}")); + } + } + + fn replay_clock_transactions_through( + &self, + state: &mut nssa::V03State, + sequencer: &LocalSequencer, + target_block_id: BlockId, + ) -> DynResult<()> { + let start_block_id = self.last_replayed_block_id.saturating_add(1); + if start_block_id > target_block_id { + return Ok(()); + } + + for block_id in start_block_id..=target_block_id { + let context = sequencer.block_context(block_id)?; + apply_clock_account_update(state, context)?; + } + + Ok(()) + } + + fn advance_rejected_clock_state(&mut self, rejection: &RejectionContext) { + let Some(observed_block_id) = rejection.observed_block_id else { + return; + }; + let Some(sequencer) = &self.sequencer else { + panic!("local sequencer should be initialized before advancing rejected clocks"); + }; + + let mut updated = self.inner.clone(); + self.replay_clock_transactions_through(&mut updated, sequencer, observed_block_id) + .unwrap_or_else(|err| { + panic!("failed to advance local clock state after rejected transaction: {err}") + }); + self.inner = updated; + self.last_replayed_block_id = observed_block_id; + } + + fn assert_affected_accounts_match(&self, tx: &RpcTransaction) { + let Some(sequencer) = &self.sequencer else { + return; + }; + + for account_id in tx.affected_public_account_ids() { + if is_clock_account(account_id) { + continue; + } + let account = self.inner.get_account_by_id(account_id); + let sequencer_account = sequencer + .get_account_by_id(account_id) + .unwrap_or_else(|err| panic!("local sequencer getAccount failed: {err}")); + assert_eq!( + sequencer_account, account, + "local sequencer account state diverged for {account_id}" + ); + } + } +} + +enum MirrorOutcome { + Committed(BlockContext), + NotCommitted(RejectionContext), +} + +enum SequencerTransactionOutcome { + Committed(BlockContext), + Rejected(RejectionContext), +} + +struct RejectionContext { + validation_context: Option, + observed_block_id: Option, +} + +impl RejectionContext { + const fn precheck() -> Self { + Self { + validation_context: None, + observed_block_id: None, + } + } +} + +fn apply_clock_transaction(state: &mut nssa::V03State, context: BlockContext) { + let clock_tx = clock_invocation(context.timestamp); + state + .transition_from_public_transaction(&clock_tx, context.block_id, context.timestamp) + .unwrap_or_else(|err| { + panic!("failed to replay sequencer clock transaction after committed user tx: {err}") + }); +} + +fn replay_clock_transaction_for_block( + state: &mut nssa::V03State, + sequencer: &LocalSequencer, + block_id: BlockId, +) -> DynResult<()> { + let context = sequencer.block_context(block_id)?; + apply_clock_transaction(state, context); + Ok(()) +} + +#[derive(BorshDeserialize, BorshSerialize)] +struct ClockAccountData { + block_id: BlockId, + timestamp: Timestamp, +} + +fn apply_clock_account_update(state: &mut nssa::V03State, context: BlockContext) -> DynResult<()> { + let previous = state.get_account_by_id(nssa::CLOCK_01_PROGRAM_ACCOUNT_ID); + let previous = ClockAccountData::try_from_slice(&previous.data.into_inner())?; + let block_id = previous + .block_id + .checked_add(1) + .ok_or_else(|| io::Error::other("clock block id overflow"))?; + let updated = borsh::to_vec(&ClockAccountData { + block_id, + timestamp: context.timestamp, + })?; + + for (account_id, divisor) in [ + (nssa::CLOCK_01_PROGRAM_ACCOUNT_ID, 1), + (nssa::CLOCK_10_PROGRAM_ACCOUNT_ID, 10), + (nssa::CLOCK_50_PROGRAM_ACCOUNT_ID, 50), + ] { + if block_id.is_multiple_of(divisor) { + let mut account = state.get_account_by_id(account_id); + account.data = updated + .clone() + .try_into() + .map_err(|_| io::Error::other("clock account data does not fit"))?; + state.force_insert_account(account_id, account); + } + } + + Ok(()) +} + +fn is_clock_account(account_id: AccountId) -> bool { + nssa::CLOCK_PROGRAM_ACCOUNT_IDS.contains(&account_id) +} + +fn clock_invocation(timestamp: Timestamp) -> PublicTransaction { + let message = public_transaction::Message::try_new( + nssa::program::Program::clock().id(), + nssa::CLOCK_PROGRAM_ACCOUNT_IDS.to_vec(), + vec![], + timestamp, + ) + .unwrap_or_else(|err| panic!("failed to build sequencer clock transaction: {err}")); + + PublicTransaction::new( + message, + public_transaction::WitnessSet::from_raw_parts(vec![]), + ) +} + +// The harness mirrors transitions in dev mode; real proving would blow the +// commit/health timeouts and surface as confusing "did not commit"/"not +// healthy" failures. Fail fast with a clear message if dev mode is explicitly +// disabled rather than silently inheriting `RISC0_DEV_MODE=0` into the child. +fn ensure_risc0_dev_mode() -> DynResult<()> { + if let Some(value) = env::var_os("RISC0_DEV_MODE") { + let value = value.to_string_lossy(); + if matches!(value.trim(), "0" | "false") { + return Err(io::Error::other(format!( + "RISC0_DEV_MODE={value} disables dev mode, but the local sequencer harness requires it; unset RISC0_DEV_MODE or set it to 1" + )) + .into()); + } + } + Ok(()) +} + +fn assert_supported_lez_checkout(lez_repo: &Path) -> DynResult<()> { + let output = Command::new("git") + .arg("-C") + .arg(lez_repo) + .args(["rev-parse", "HEAD"]) + .output()?; + if !output.status.success() { + return Err(io::Error::other(format!( + "failed to read logos-execution-zone checkout revision: {}", + String::from_utf8_lossy(&output.stderr).trim() + )) + .into()); + } + + let rev = String::from_utf8(output.stdout)?; + let rev = rev.trim(); + if rev != EXPECTED_LEZ_COMMIT { + return Err(io::Error::other(format!( + "local sequencer test schema is pinned to logos-execution-zone {EXPECTED_LEZ_COMMIT}, found {rev}" + )) + .into()); + } + + Ok(()) +} + +struct LocalSequencer { + url: String, + child: Child, + work_dir: PathBuf, + genesis_block_id: BlockId, +} + +impl LocalSequencer { + fn spawn(state: &nssa::V03State) -> DynResult { + ensure_risc0_dev_mode()?; + let lez_repo = env::var_os(LOCAL_SEQUENCER_REPO_ENV).ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!( + "{LOCAL_SEQUENCER_REPO_ENV} is required; run `cargo run -p local-sequencer -- test`" + ), + ) + })?; + let lez_repo = PathBuf::from(lez_repo).canonicalize().map_err(|err| { + io::Error::new( + err.kind(), + format!( + "failed to resolve {LOCAL_SEQUENCER_REPO_ENV} path {}: {err}", + PathBuf::from(env::var_os(LOCAL_SEQUENCER_REPO_ENV).unwrap_or_default()) + .display() + ), + ) + })?; + assert_supported_lez_checkout(&lez_repo)?; + let sequencer_bin = lez_repo.join(LEZ_SEQUENCER_BIN_REL_PATH); + if !sequencer_bin.is_file() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!( + "missing standalone sequencer binary at {}; run `cargo run -p local-sequencer -- setup`", + sequencer_bin.display() + ), + ) + .into()); + } + + // `unused_localhost_port` probes for a free ephemeral port and drops the + // listener; a concurrent test (Cargo runs tests in parallel by default, and + // each `TestState` spawns its own sequencer) can claim that port before the + // child binds it. When that happens the child exits early instead of becoming + // healthy. Retry with a fresh port on that specific failure; setup errors + // above this loop are fatal and never retried. + let mut last_err: Option = None; + for _ in 0..SPAWN_ATTEMPTS { + let port = unused_localhost_port()?; + let url = format!("http://127.0.0.1:{port}"); + let work_dir = local_sequencer_work_dir(port)?; + let work_dir_guard = WorkDirGuard::new(work_dir); + fs::create_dir_all(work_dir_guard.path())?; + seed_sequencer_state(work_dir_guard.path(), state)?; + let config = prepare_lez_sequencer_config(&lez_repo, work_dir_guard.path(), port)?; + + let mut command = Command::new(&sequencer_bin); + command + .arg(&config.path) + .arg("--port") + .arg(port.to_string()) + .current_dir(work_dir_guard.path()) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .env("RUST_LOG", "error"); + if env::var_os("RISC0_DEV_MODE").is_none() { + command.env("RISC0_DEV_MODE", "1"); + } + + let mut sequencer = Self { + url, + child: command.spawn()?, + work_dir: work_dir_guard.disarm(), + genesis_block_id: config.genesis_block_id, + }; + match wait_for_health(&sequencer.url, &mut sequencer.child) { + Ok(()) => { + validate_spawned_sequencer(&mut sequencer, state)?; + return Ok(sequencer); + } + Err(err) => { + // A still-running child means a genuine failure (e.g. the health + // timeout) — surface it. An already-exited child is the port race + // above, so drop this attempt (Drop removes its work dir) and retry. + if sequencer.child.try_wait().ok().flatten().is_none() { + return Err(err); + } + last_err = Some(err); + } + } + } + + Err(last_err.unwrap_or_else(|| io::Error::other("local sequencer failed to spawn").into())) + } + + fn submit_transaction(&self, tx: &RpcTransaction) -> DynResult<(String, String)> { + let encoded = BASE64_STANDARD.encode(borsh::to_vec(tx)?); + let tx_hash = hex_encode(&tx.hash())?; + let result = rpc_call(&self.url, "sendTransaction", json!([encoded.clone()]))?; + let expected = json!(tx_hash.clone()); + if result != expected { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("sendTransaction returned {result}, expected {expected}"), + ) + .into()); + } + Ok((tx_hash, encoded)) + } + + // The poll/health loops already retry transient transport errors; submit did + // not, so a flaky localhost connection (e.g. a `ConnectionReset` under + // parallel load) would panic instead of recovering. Re-submitting is safe: + // the sequencer drops a duplicate during stateful validation once its + // nonce/nullifier is consumed. Precheck rejections are `RpcError`, not + // `io::Error`, so they are not retryable and surface immediately. + fn submit_transaction_retrying( + &self, + tx: &RpcTransaction, + timeout: Duration, + ) -> DynResult { + let scan_after_block_id = self.last_block_id()?; + let deadline = Instant::now() + .checked_add(timeout) + .ok_or_else(|| io::Error::other("submit timeout overflow"))?; + + loop { + match self.submit_transaction(tx) { + Ok((tx_hash, encoded)) => { + let rejection_observation_start_block_id = self.last_block_id()?; + return Ok(SubmittedTransaction { + tx_hash, + encoded, + scan_after_block_id, + rejection_observation_start_block_id, + }); + } + Err(err) if is_retryable_transport_error(err.as_ref()) => { + if Instant::now() >= deadline { + return Err(err); + } + std::thread::sleep(POLL_INTERVAL); + } + Err(err) => return Err(err), + } + } + } + + // The "two post-submit blocks then assume rejected" bound is sound only + // because each `TestState` submits one transaction and blocks on its + // outcome (every `transition_*` takes `&mut self`), so the rejected tx is + // the sole mempool entry and is popped+skipped in the first post-submit + // block. A future change that pipelines submissions would invalidate this. + fn wait_for_rejection_or_transaction( + &self, + submitted: &SubmittedTransaction, + timeout: Duration, + ) -> DynResult { + let validation_context_block_id = submitted + .rejection_observation_start_block_id + .checked_add(1) + .ok_or_else(|| io::Error::other("validation context block id overflow"))?; + let observed_after_submit_block_id = submitted + .rejection_observation_start_block_id + .checked_add(2) + .ok_or_else(|| io::Error::other("rejection observation block id overflow"))?; + let deadline = Instant::now() + .checked_add(timeout) + .ok_or_else(|| io::Error::other("transaction rejection timeout overflow"))?; + + loop { + match self.transaction_context_if_committed(submitted) { + Ok(Some(context)) => return Ok(SequencerTransactionOutcome::Committed(context)), + Ok(None) => {} + Err(err) if is_retryable_transport_error(err.as_ref()) => { + if Instant::now() >= deadline { + return Err(err); + } + std::thread::sleep(POLL_INTERVAL); + continue; + } + Err(err) => return Err(err), + } + + let last_block_id = match self.last_block_id() { + Ok(last_block_id) => last_block_id, + Err(err) if is_retryable_transport_error(err.as_ref()) => { + if Instant::now() >= deadline { + return Err(err); + } + std::thread::sleep(POLL_INTERVAL); + continue; + } + Err(err) => return Err(err), + }; + + if last_block_id >= observed_after_submit_block_id { + match self.transaction_context_if_committed(submitted) { + Ok(Some(context)) => { + return Ok(SequencerTransactionOutcome::Committed(context)) + } + Ok(None) => { + let validation_context = self.block_context(validation_context_block_id)?; + return Ok(SequencerTransactionOutcome::Rejected(RejectionContext { + validation_context: Some(validation_context), + observed_block_id: Some(last_block_id), + })); + } + Err(err) if is_retryable_transport_error(err.as_ref()) => { + if Instant::now() >= deadline { + return Err(err); + } + std::thread::sleep(POLL_INTERVAL); + continue; + } + Err(err) => return Err(err), + } + } + + if Instant::now() >= deadline { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!( + "local sequencer did not produce two post-submit blocks for {} within {timeout:?}", + submitted.tx_hash + ), + ) + .into()); + } + + std::thread::sleep(POLL_INTERVAL); + } + } + + fn transaction_context_if_committed( + &self, + submitted: &SubmittedTransaction, + ) -> DynResult> { + let stored_tx = rpc_call( + &self.url, + "getTransaction", + json!([submitted.tx_hash.clone()]), + )?; + if stored_tx.is_null() { + return Ok(None); + } + + let expected = json!(submitted.encoded.clone()); + if stored_tx != expected { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "getTransaction returned {stored_tx}, expected {expected} for {}", + submitted.tx_hash + ), + ) + .into()); + } + + self.committed_block_context(submitted) + } + + fn committed_block_context( + &self, + submitted: &SubmittedTransaction, + ) -> DynResult> { + let last_block_id = self.last_block_id()?; + let start_block_id = submitted + .scan_after_block_id + .checked_add(1) + .ok_or_else(|| io::Error::other("committed block scan start overflow"))?; + if start_block_id > last_block_id { + return Ok(None); + } + + for block_id in (start_block_id..=last_block_id).rev() { + let Some(block) = self.get_block(block_id)? else { + continue; + }; + let has_transaction = block + .body + .transactions + .iter() + .any(|tx| hex_encode(&tx.hash()).is_ok_and(|hash| hash == submitted.tx_hash)); + if has_transaction { + return Ok(Some(BlockContext { + block_id: block.header.block_id, + timestamp: block.header.timestamp, + })); + } + } + + Ok(None) + } + + fn last_block_id(&self) -> DynResult { + rpc_call(&self.url, "getLastBlockId", json!([]))? + .as_u64() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid last block id")) + .map_err(Into::into) + } + + fn get_block(&self, block_id: BlockId) -> DynResult> { + let block = rpc_call(&self.url, "getBlock", json!([block_id]))?; + if block.is_null() { + return Ok(None); + } + + let encoded = block.as_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "getBlock result is not a string", + ) + })?; + let bytes = BASE64_STANDARD.decode(encoded)?; + Ok(Some(borsh::from_slice(&bytes)?)) + } + + fn block_context(&self, block_id: BlockId) -> DynResult { + let block = self.get_block(block_id)?.ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("sequencer block {block_id} was not found"), + ) + })?; + Ok(BlockContext { + block_id: block.header.block_id, + timestamp: block.header.timestamp, + }) + } + + fn get_account_by_id(&self, account_id: AccountId) -> DynResult { + let account = rpc_call(&self.url, "getAccount", json!([account_id]))?; + Ok(serde_json::from_value(account)?) + } + + fn get_proof_for_commitment( + &self, + commitment: &Commitment, + ) -> DynResult> { + let proof = rpc_call(&self.url, "getProofForCommitment", json!([commitment]))?; + Ok(serde_json::from_value(proof)?) + } +} + +struct WorkDirGuard { + path: PathBuf, +} + +impl WorkDirGuard { + const fn new(path: PathBuf) -> Self { + Self { path } + } + + fn path(&self) -> &Path { + &self.path + } + + fn disarm(mut self) -> PathBuf { + std::mem::take(&mut self.path) + } +} + +impl Drop for WorkDirGuard { + fn drop(&mut self) { + if !self.path.as_os_str().is_empty() { + let _ = fs::remove_dir_all(&self.path); + } + } +} + +impl Drop for LocalSequencer { + fn drop(&mut self) { + if self.child.try_wait().ok().flatten().is_none() && self.child.kill().is_ok() { + let _ = self.child.wait(); + } + + let _ = fs::remove_dir_all(&self.work_dir); + } +} + +struct SubmittedTransaction { + tx_hash: String, + encoded: String, + scan_after_block_id: BlockId, + rejection_observation_start_block_id: BlockId, +} + +#[derive(Clone, Copy)] +struct BlockContext { + block_id: BlockId, + timestamp: Timestamp, +} + +// Borsh wire mirror of LEZ common::transaction::NSSATransaction at EXPECTED_LEZ_COMMIT. +// Variant order and payload types must stay byte-identical to the standalone sequencer. +#[derive(BorshDeserialize, BorshSerialize)] +enum RpcTransaction { + Public(Box), + PrivacyPreserving(Box), + ProgramDeployment(Box), +} + +impl RpcTransaction { + fn hash(&self) -> [u8; 32] { + match self { + Self::Public(tx) => tx.hash(), + Self::PrivacyPreserving(tx) => tx.hash(), + Self::ProgramDeployment(tx) => tx.hash(), + } + } + + fn affected_public_account_ids(&self) -> Vec { + match self { + Self::Public(tx) => tx.affected_public_account_ids(), + Self::PrivacyPreserving(tx) => tx.affected_public_account_ids(), + Self::ProgramDeployment(tx) => tx.affected_public_account_ids(), + } + } +} + +#[derive(BorshDeserialize)] +struct RpcBlock { + header: RpcBlockHeader, + body: RpcBlockBody, + _bedrock_status: RpcBedrockStatus, + _bedrock_parent_id: [u8; 32], +} + +#[derive(BorshDeserialize)] +struct RpcBlockHeader { + block_id: BlockId, + _prev_block_hash: [u8; 32], + _hash: [u8; 32], + timestamp: Timestamp, + _signature: nssa::Signature, +} + +#[derive(BorshDeserialize)] +struct RpcBlockBody { + transactions: Vec, +} + +#[derive(BorshDeserialize)] +enum RpcBedrockStatus { + Pending, + Safe, + Finalized, +} + +#[derive(BorshSerialize)] +struct NssaStateCellRef<'state>(&'state nssa::V03State); + +// Direct RocksDB seeding mirrors LEZ storage::sequencer schema at EXPECTED_LEZ_COMMIT. +fn seed_sequencer_state(work_dir: &Path, state: &nssa::V03State) -> DynResult<()> { + let mut cf_opts = Options::default(); + cf_opts.set_max_write_buffer_number(16); + let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); + let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); + let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts); + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + let db = DBWithThreadMode::::open_cf_descriptors( + &db_opts, + work_dir.join("rocksdb"), + vec![cfb, cfmeta, cfstate], + )?; + let state_column = db + .cf_handle(CF_NSSA_STATE_NAME) + .ok_or_else(|| io::Error::other("state column family not created"))?; + db.put_cf( + &state_column, + borsh::to_vec(&DB_NSSA_STATE_KEY)?, + borsh::to_vec(&NssaStateCellRef(state))?, + )?; + Ok(()) +} + +struct PreparedSequencerConfig { + path: PathBuf, + genesis_block_id: BlockId, +} + +fn prepare_lez_sequencer_config( + lez_repo: &Path, + work_dir: &Path, + port: u16, +) -> DynResult { + let src_path = find_lez_sequencer_config(lez_repo)?; + let text = fs::read_to_string(&src_path)?; + let mut config: Value = serde_json::from_str(&text)?; + let genesis_block_id = config + .get("genesis_id") + .and_then(Value::as_u64) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "sequencer config missing numeric genesis_id: {}", + src_path.display() + ), + ) + })?; + let Some(config_object) = config.as_object_mut() else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "sequencer config is not a JSON object: {}", + src_path.display() + ), + ) + .into()); + }; + + config_object.insert("home".to_owned(), json!(".")); + config_object.insert("port".to_owned(), json!(port)); + config_object.insert("max_block_size".to_owned(), json!("8 MiB")); + config_object.insert("block_create_timeout".to_owned(), json!("100ms")); + config_object.insert("retry_pending_blocks_timeout".to_owned(), json!("100ms")); + + let config_path = work_dir.join("sequencer_config.json"); + fs::write( + &config_path, + format!("{}\n", serde_json::to_string_pretty(&config)?), + )?; + Ok(PreparedSequencerConfig { + path: config_path, + genesis_block_id, + }) +} + +fn find_lez_sequencer_config(lez_repo: &Path) -> DynResult { + for candidate in LEZ_SEQUENCER_CONFIG_CANDIDATES { + let path = lez_repo.join(candidate); + if path.is_file() { + return Ok(path); + } + } + + Err(io::Error::new( + io::ErrorKind::NotFound, + format!( + "missing sequencer debug config under {}; checked: {}", + lez_repo.display(), + LEZ_SEQUENCER_CONFIG_CANDIDATES.join(", ") + ), + ) + .into()) +} + +fn unused_localhost_port() -> DynResult { + let listener = TcpListener::bind(("127.0.0.1", 0))?; + Ok(listener.local_addr()?.port()) +} + +fn local_sequencer_work_dir(port: u16) -> DynResult { + let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + Ok(Path::new(env!("CARGO_MANIFEST_DIR")).join(format!( + "../../target/local-sequencer/runs/{}-{timestamp}-{port}", + std::process::id() + ))) +} + +fn wait_for_health(sequencer_url: &str, child: &mut Child) -> DynResult<()> { + let deadline = Instant::now() + .checked_add(HEALTH_TIMEOUT) + .ok_or_else(|| io::Error::other("sequencer health timeout overflow"))?; + + loop { + if rpc_call(sequencer_url, "checkHealth", json!([])).is_ok() { + return Ok(()); + } + + if let Some(status) = child.try_wait()? { + return Err(io::Error::other(format!( + "local sequencer exited before becoming healthy: {status}" + )) + .into()); + } + + if Instant::now() >= deadline { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("local sequencer was not healthy within {HEALTH_TIMEOUT:?}"), + ) + .into()); + } + + std::thread::sleep(POLL_INTERVAL); + } +} + +fn validate_spawned_sequencer( + sequencer: &mut LocalSequencer, + state: &nssa::V03State, +) -> DynResult<()> { + if let Some(status) = sequencer.child.try_wait()? { + return Err(io::Error::other(format!( + "local sequencer exited immediately after health check: {status}" + )) + .into()); + } + + let last_block_id = sequencer.last_block_id()?; + if last_block_id < sequencer.genesis_block_id { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "local sequencer last block id {last_block_id} is before genesis block id {}", + sequencer.genesis_block_id + ), + ) + .into()); + } + + let mut expected = state.clone(); + let first_replay_block = sequencer.genesis_block_id.saturating_add(1); + if first_replay_block <= last_block_id { + for block_id in first_replay_block..=last_block_id { + apply_clock_account_update(&mut expected, sequencer.block_context(block_id)?)?; + } + } + + for account_id in nssa::CLOCK_PROGRAM_ACCOUNT_IDS { + let sequencer_account = sequencer.get_account_by_id(account_id)?; + let expected_account = expected.get_account_by_id(account_id); + if sequencer_account != expected_account { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("local sequencer clock account {account_id} does not match seeded state"), + ) + .into()); + } + } + + Ok(()) +} + +fn rpc_call(sequencer_url: &str, method: &str, params: Value) -> DynResult { + let request = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }); + let response = post_json(sequencer_url, &request.to_string())?; + let response: Value = serde_json::from_str(&response)?; + + if let Some(error) = response.get("error") { + return Err(RpcError { + method: method.to_owned(), + error: error.clone(), + } + .into()); + } + + response + .get("result") + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "rpc response missing result")) + .map_err(Into::into) +} + +#[derive(Debug)] +struct RpcError { + method: String, + error: Value, +} + +impl RpcError { + fn code(&self) -> Option { + self.error.get("code").and_then(Value::as_i64) + } + + fn message(&self) -> Option<&str> { + self.error.get("message").and_then(Value::as_str) + } + + fn is_transaction_precheck_rejection(&self) -> bool { + if self.method != "sendTransaction" || self.code() != Some(JSON_RPC_INVALID_PARAMS_CODE) { + return false; + } + + self.message().is_some_and(|message| { + message.starts_with("InvalidSignature") || message.starts_with("Transaction too large") + }) + } +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "rpc {} failed: {}", self.method, self.error) + } +} + +impl Error for RpcError {} + +fn is_transaction_precheck_rejection(error: &(dyn Error + 'static)) -> bool { + let mut current = Some(error); + while let Some(error) = current { + if let Some(rpc_error) = error.downcast_ref::() { + return rpc_error.is_transaction_precheck_rejection(); + } + current = error.source(); + } + false +} + +fn is_retryable_transport_error(error: &(dyn Error + 'static)) -> bool { + let mut current = Some(error); + while let Some(error) = current { + if let Some(io_error) = error.downcast_ref::() { + return matches!( + io_error.kind(), + io::ErrorKind::BrokenPipe + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset + | io::ErrorKind::Interrupted + | io::ErrorKind::NotConnected + | io::ErrorKind::TimedOut + | io::ErrorKind::UnexpectedEof + | io::ErrorKind::WouldBlock + ); + } + current = error.source(); + } + false +} + +fn post_json(url: &str, body: &str) -> DynResult { + let endpoint = HttpEndpoint::parse(url)?; + let mut stream = TcpStream::connect((endpoint.host.as_str(), endpoint.port))?; + let deadline = Instant::now() + .checked_add(RPC_CALL_TIMEOUT) + .ok_or_else(|| io::Error::other("rpc call timeout overflow"))?; + stream.set_read_timeout(Some(RPC_CALL_TIMEOUT))?; + stream.set_write_timeout(Some(RPC_CALL_TIMEOUT))?; + + write!( + stream, + "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + endpoint.path, + endpoint.authority, + body.len(), + body + )?; + + let response = read_bounded_response(stream, deadline)?; + let (headers, body) = response + .split_once("\r\n\r\n") + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid http response"))?; + let status_line = headers + .lines() + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing http status"))?; + if !status_line.contains(" 200 ") { + // Include the body so a structured error returned with a non-200 status + // (e.g. jsonrpsee's 413 for an oversized request) is not lost. + return Err( + io::Error::other(format!("http request failed: {status_line}; body: {body}")).into(), + ); + } + + Ok(body.to_owned()) +} + +fn read_bounded_response(mut stream: TcpStream, deadline: Instant) -> DynResult { + let mut response = Vec::new(); + let mut buffer = [0_u8; 8192]; + + loop { + let now = Instant::now(); + if now >= deadline { + return Err(io::Error::new(io::ErrorKind::TimedOut, "rpc response timed out").into()); + } + stream.set_read_timeout(Some(deadline.saturating_duration_since(now)))?; + + match stream.read(&mut buffer) { + Ok(0) => break, + Ok(read) => { + if response.len().saturating_add(read) > MAX_RPC_RESPONSE_BYTES { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("rpc response exceeded {MAX_RPC_RESPONSE_BYTES} bytes"), + ) + .into()); + } + let chunk = buffer.get(..read).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "invalid rpc read length") + })?; + response.extend_from_slice(chunk); + } + Err(err) if err.kind() == io::ErrorKind::Interrupted => {} + Err(err) + if matches!( + err.kind(), + io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock + ) && Instant::now() < deadline => {} + Err(err) => return Err(err.into()), + } + } + + Ok(String::from_utf8(response)?) +} + +struct HttpEndpoint { + authority: String, + host: String, + port: u16, + path: String, +} + +impl HttpEndpoint { + fn parse(url: &str) -> DynResult { + let rest = url.strip_prefix("http://").ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "only http:// local sequencer URLs are supported", + ) + })?; + let (authority, path) = rest + .split_once('/') + .map_or((rest, String::from("/")), |(authority, path)| { + (authority, format!("/{path}")) + }); + let (host, port) = match authority.rsplit_once(':') { + Some((host, port)) => (host, port.parse::()?), + None => (authority, 80), + }; + + if host.is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "missing http host").into()); + } + + Ok(Self { + authority: authority.to_owned(), + host: host.to_owned(), + port, + path, + }) + } +} + +fn hex_encode(bytes: &[u8; 32]) -> Result { + let mut output = String::with_capacity(64); + for byte in bytes { + write!(&mut output, "{byte:02x}")?; + } + Ok(output) +} + +#[cfg(test)] +mod tests { + use super::EXPECTED_LEZ_COMMIT; + + // EXPECTED_LEZ_COMMIT is hand-maintained alongside the `tag` pin in the + // workspace Cargo.toml; a tag bump that forgets to update it would only + // fail at sequencer-spawn time. Pin them together here so drift fails in a + // fast unit test instead. The commit is resolved by Cargo into Cargo.lock. + #[test] + fn expected_lez_commit_matches_cargo_lock_pin() { + let lock = include_str!("../../../Cargo.lock"); + let commit = lock + .lines() + .filter(|line| line.contains("logos-execution-zone.git?")) + .find_map(|line| line.rsplit_once('#')) + .map(|(_, rest)| rest.trim().trim_end_matches('"')) + .expect("could not find the logos-execution-zone pin in Cargo.lock"); + + assert_eq!( + commit, EXPECTED_LEZ_COMMIT, + "EXPECTED_LEZ_COMMIT is out of sync with the Cargo.lock logos-execution-zone pin; \ + update EXPECTED_LEZ_COMMIT (and the circuits-version table in tools/local-sequencer) \ + to match the pinned tag" + ); + } +} diff --git a/programs/integration_tests/tests/amm.rs b/programs/integration_tests/tests/amm.rs index bfb6feb..604ef5c 100644 --- a/programs/integration_tests/tests/amm.rs +++ b/programs/integration_tests/tests/amm.rs @@ -7,10 +7,11 @@ use amm_core::{ PoolDefinition, FEE_TIER_BPS_1, FEE_TIER_BPS_100, FEE_TIER_BPS_30, FEE_TIER_BPS_5, MINIMUM_LIQUIDITY, }; +use integration_tests::TestState as V03State; use nssa::{ error::NssaError, program_deployment_transaction::{self, ProgramDeploymentTransaction}, - public_transaction, PrivateKey, PublicKey, PublicTransaction, V03State, + public_transaction, PrivateKey, PublicKey, PublicTransaction, }; use nssa_core::account::{Account, AccountId, Data, Nonce}; use token_core::{TokenDefinition, TokenHolding}; diff --git a/programs/integration_tests/tests/ata.rs b/programs/integration_tests/tests/ata.rs index d88bd3b..dacf823 100644 --- a/programs/integration_tests/tests/ata.rs +++ b/programs/integration_tests/tests/ata.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use ata_core::{compute_ata_seed, get_associated_token_account_id}; +use integration_tests::TestState as V03State; use nssa::{ execute_and_prove, privacy_preserving_transaction::{ @@ -9,7 +10,7 @@ use nssa::{ program::Program, program_deployment_transaction::{self, ProgramDeploymentTransaction}, public_transaction, EphemeralPublicKey, PrivateKey, PublicKey, PublicTransaction, - SharedSecretKey, V03State, + SharedSecretKey, }; use nssa_core::{ account::{Account, AccountId, AccountWithMetadata, Data, Nonce}, diff --git a/programs/integration_tests/tests/stablecoin.rs b/programs/integration_tests/tests/stablecoin.rs index 4044d84..56b5fb5 100644 --- a/programs/integration_tests/tests/stablecoin.rs +++ b/programs/integration_tests/tests/stablecoin.rs @@ -1,6 +1,7 @@ +use integration_tests::TestState as V03State; use nssa::{ program_deployment_transaction::{self, ProgramDeploymentTransaction}, - public_transaction, PrivateKey, PublicKey, PublicTransaction, V03State, + public_transaction, PrivateKey, PublicKey, PublicTransaction, }; use nssa_core::account::{Account, AccountId, Data, Nonce}; use stablecoin_core::{compute_position_pda, compute_position_vault_pda, Position}; diff --git a/programs/integration_tests/tests/token.rs b/programs/integration_tests/tests/token.rs index 9e308a6..0746af4 100644 --- a/programs/integration_tests/tests/token.rs +++ b/programs/integration_tests/tests/token.rs @@ -1,10 +1,11 @@ +use integration_tests::TestState as V03State; use nssa::{ execute_and_prove, privacy_preserving_transaction::{Message, WitnessSet}, program::Program, program_deployment_transaction::{self, ProgramDeploymentTransaction}, public_transaction, PrivacyPreservingTransaction, PrivateKey, PublicKey, PublicTransaction, - SharedSecretKey, V03State, + SharedSecretKey, Signature, }; use nssa_core::{ account::{Account, AccountId, AccountWithMetadata, Data, Nonce}, @@ -130,6 +131,34 @@ fn state_for_token_tests_without_recipient() -> V03State { state } +#[cfg(feature = "local-sequencer-tests")] +fn clock_account_data(account: Account) -> (u64, u64) { + let data = account.data.into_inner(); + assert_eq!(data.len(), 16); + + let mut block_id = [0_u8; 8]; + let mut timestamp = [0_u8; 8]; + let (block_id_bytes, timestamp_bytes) = data + .split_at_checked(8) + .expect("clock account data length was checked"); + block_id.copy_from_slice(block_id_bytes); + timestamp.copy_from_slice(timestamp_bytes); + + (u64::from_le_bytes(block_id), u64::from_le_bytes(timestamp)) +} + +#[cfg(feature = "local-sequencer-tests")] +#[test] +fn token_first_committed_transaction_keeps_clock_aligned_with_sequencer() { + let mut state = V03State::new_with_genesis_accounts(&[], vec![], 0); + deploy_token(&mut state); + + let (block_id, timestamp) = + clock_account_data(state.get_account_by_id(nssa::CLOCK_01_PROGRAM_ACCOUNT_ID)); + assert_ne!(block_id, 0); + assert_ne!(timestamp, 0); +} + #[test] fn token_new_fungible_definition() { let mut state = V03State::new_with_genesis_accounts(&[], vec![], 0); @@ -335,6 +364,58 @@ fn token_transfer_fresh_public_recipient_requires_authorization() { ); } +#[test] +fn token_invalid_signature_rejection_keeps_local_sequencer_usable() { + let mut state = state_for_token_tests(); + + let instruction = token_core::Instruction::Transfer { + amount_to_transfer: 1, + }; + let message = public_transaction::Message::try_new( + Ids::token_program(), + vec![Ids::holder(), Ids::recipient()], + vec![Nonce(0)], + instruction, + ) + .unwrap(); + let invalid_signature = Signature { value: [0xff; 64] }; + let witness_set = public_transaction::WitnessSet::from_raw_parts(vec![( + invalid_signature, + PublicKey::new_from_private_key(&Keys::holder_key()), + )]); + let tx = PublicTransaction::new(message, witness_set); + + assert!(state.transition_from_public_transaction(&tx, 0, 0).is_err()); + + let instruction = token_core::Instruction::Transfer { + amount_to_transfer: 1, + }; + let message = public_transaction::Message::try_new( + Ids::token_program(), + vec![Ids::holder(), Ids::recipient()], + vec![Nonce(0)], + instruction, + ) + .unwrap(); + let witness_set = public_transaction::WitnessSet::for_message(&message, &[&Keys::holder_key()]); + let tx = PublicTransaction::new(message, witness_set); + + state.transition_from_public_transaction(&tx, 0, 0).unwrap(); + + assert_eq!( + state.get_account_by_id(Ids::recipient()), + Account { + program_owner: Ids::token_program(), + balance: 0_u128, + data: Data::from(&TokenHolding::Fungible { + definition_id: Ids::token_definition(), + balance: 1, + }), + nonce: Nonce(0), + } + ); +} + #[test] fn token_transfer_fresh_authorized_public_recipient() { let mut state = state_for_token_tests_without_recipient(); diff --git a/tools/local-sequencer/Cargo.toml b/tools/local-sequencer/Cargo.toml new file mode 100644 index 0000000..3931e3f --- /dev/null +++ b/tools/local-sequencer/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "local-sequencer" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[[bin]] +name = "local-sequencer" +path = "src/main.rs" diff --git a/tools/local-sequencer/src/main.rs b/tools/local-sequencer/src/main.rs new file mode 100644 index 0000000..be37978 --- /dev/null +++ b/tools/local-sequencer/src/main.rs @@ -0,0 +1,596 @@ +use std::{ + env, + error::Error, + ffi::OsStr, + fs, + path::{Path, PathBuf}, + process::{self, Command}, + time::{SystemTime, UNIX_EPOCH}, +}; + +type DynResult = Result>; + +const LEZ_SOURCE_ENV: &str = "LEZ_LOCAL_SEQUENCER_SOURCE"; +const LEZ_REF_ENV: &str = "LEZ_LOCAL_SEQUENCER_REF"; +const LEZ_REPO_ENV: &str = "LEZ_LOCAL_SEQUENCER_REPO"; +const LEZ_CACHE_ROOT_ENV: &str = "LEZ_LOCAL_SEQUENCER_CACHE_ROOT"; +const LEZ_CIRCUITS_VERSION_ENV: &str = "LEZ_LOCAL_SEQUENCER_CIRCUITS_VERSION"; +const LEZ_SKIP_CIRCUITS_ENV: &str = "LEZ_LOCAL_SEQUENCER_SKIP_CIRCUITS"; +const LOGOS_BLOCKCHAIN_CIRCUITS_ENV: &str = "LOGOS_BLOCKCHAIN_CIRCUITS"; +const CIRCUITS_RELEASE_BASE_URL: &str = + "https://github.com/logos-blockchain/logos-blockchain-circuits/releases/download"; +const CIRCUITS_SENTINEL: &str = "pol/verification_key.json"; +const CIRCUITS_READY_SENTINEL: &str = ".ready"; +const SEQUENCER_BIN_REL_PATH: &str = "target/release/sequencer_service"; + +fn main() { + if let Err(err) = run() { + eprintln!("error: {err}"); + process::exit(1); + } +} + +fn run() -> DynResult<()> { + let command = env::args().nth(1).unwrap_or_else(|| "setup".to_owned()); + match command.as_str() { + "setup" => { + let checkout = setup()?; + print_ready(&checkout); + Ok(()) + } + "test" => { + let checkout = setup()?; + run_integration_tests(&checkout) + } + "-h" | "--help" | "help" => { + print_usage(); + Ok(()) + } + other => Err(format!("unknown command `{other}`; expected `setup` or `test`").into()), + } +} + +fn print_usage() { + println!( + "Usage: cargo run -p local-sequencer -- [setup|test]\n\ +\n\ +Commands:\n\ + setup Clone/update pinned LEZ and build standalone sequencer_service\n\ + test Run setup, then run integration tests through local sequencer\n\ +\n\ +Environment:\n\ + {LEZ_REPO_ENV} Existing pinned LEZ checkout to validate/build; not reset\n\ + {LEZ_CACHE_ROOT_ENV} Cache root for default LEZ clone and circuits\n\ + {LEZ_SOURCE_ENV} LEZ git source URL for managed cache clone/update\n\ + {LEZ_REF_ENV} LEZ git ref/tag; set when manifest parsing cannot infer it\n\ + {LEZ_CIRCUITS_VERSION_ENV} Circuits release version without leading v\n\ + {LEZ_SKIP_CIRCUITS_ENV} Set to 1 to skip circuits cache setup" + ); +} + +fn setup() -> DynResult { + require_cmd("git")?; + require_cmd("cargo")?; + + let repo_root = repo_root(); + let manifest = repo_root.join("Cargo.toml"); + let lockfile = repo_root.join("Cargo.lock"); + let lez_ref = match env::var(LEZ_REF_ENV) { + Ok(reference) => reference, + Err(_) => parse_lez_ref(&manifest)?, + }; + let lez_commit = parse_lez_commit(&lockfile)?; + let cache_root = local_sequencer_cache_root(); + let maybe_checkout = env::var_os(LEZ_REPO_ENV); + let checkout = maybe_checkout.as_ref().map_or_else( + || { + cache_root + .join("repos") + .join("logos-execution-zone") + .join(cache_safe_component(&lez_ref)) + }, + PathBuf::from, + ); + + if maybe_checkout.is_some() { + validate_existing_lez_checkout(&checkout, &lez_commit)?; + } else { + // `unwrap_or` evaluates eagerly, so keep manifest parsing behind lazy match arms. + let lez_source = match env::var(LEZ_SOURCE_ENV) { + Ok(source) => source, + Err(_) => parse_lez_source(&manifest)?, + }; + clone_or_update_lez(&checkout, &lez_source, &lez_ref, &lez_commit)?; + } + ensure_circuits(&cache_root, &lez_ref)?; + build_sequencer(&checkout, &lez_ref, &lez_commit)?; + + let sequencer_bin = checkout.join(SEQUENCER_BIN_REL_PATH); + if !sequencer_bin.is_file() { + return Err(format!("expected binary not found: {}", sequencer_bin.display()).into()); + } + + Ok(checkout) +} + +fn repo_root() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(Path::parent) + .expect("tool lives under tools/local-sequencer") + .to_path_buf() +} + +fn local_sequencer_cache_root() -> PathBuf { + if let Some(path) = env::var_os(LEZ_CACHE_ROOT_ENV) { + return PathBuf::from(path); + } + + if let Some(path) = env::var_os("XDG_CACHE_HOME") { + return PathBuf::from(path).join("lez-programs/local-sequencer"); + } + + if let Some(path) = env::var_os("HOME") { + return PathBuf::from(path).join(".cache/lez-programs/local-sequencer"); + } + + env::temp_dir().join("lez-programs/local-sequencer") +} + +fn cache_safe_component(value: &str) -> String { + value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '.' | '-' | '_') { + ch + } else { + '_' + } + }) + .collect() +} + +fn parse_lez_source(manifest: &Path) -> DynResult { + parse_quoted_field(manifest, "git").ok_or_else(|| { + format!( + "could not parse logos-execution-zone git source from {}", + manifest.display() + ) + .into() + }) +} + +fn parse_lez_ref(manifest: &Path) -> DynResult { + parse_quoted_field(manifest, "tag").ok_or_else(|| { + format!( + "could not parse logos-execution-zone tag from {}", + manifest.display() + ) + .into() + }) +} + +fn parse_lez_commit(lockfile: &Path) -> DynResult { + let content = fs::read_to_string(lockfile)?; + content + .lines() + .filter(|line| line.contains("logos-execution-zone.git?")) + .find_map(|line| line.rsplit_once('#')) + .map(|(_, rest)| rest.trim().trim_end_matches('"').to_owned()) + .filter(|commit| !commit.is_empty()) + .ok_or_else(|| { + format!( + "could not parse logos-execution-zone commit from {}", + lockfile.display() + ) + .into() + }) +} + +// Scans every line that mentions the dependency, not just the first, so a +// preceding comment or a reordered entry can't shadow the real source/tag. It +// still assumes the field lives on the same line as the dependency name, which +// holds for the inline-table form used in the workspace Cargo.toml; a multi-line +// reformat would yield None here and fail loudly (and is recoverable via the +// LEZ_LOCAL_SEQUENCER_REF / LEZ_LOCAL_SEQUENCER_SOURCE overrides). +fn parse_quoted_field(manifest: &Path, field: &str) -> Option { + let content = fs::read_to_string(manifest).ok()?; + let marker = format!("{field} = \""); + content + .lines() + .filter(|line| line.contains("logos-execution-zone")) + .find_map(|line| { + let (_, value) = line.split_once(&marker)?; + let value_end = value.find('"')?; + Some(value[..value_end].to_owned()) + }) +} + +fn clone_or_update_lez( + checkout: &Path, + source: &str, + reference: &str, + commit: &str, +) -> DynResult<()> { + if let Some(parent) = checkout.parent() { + fs::create_dir_all(parent)?; + } + + if checkout.join(".git").is_dir() { + let current_source = command_output( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["remote", "get-url", "origin"]), + )?; + if current_source.trim() != source { + return Err(format!( + "{} already exists with origin {}, expected {source}", + checkout.display(), + current_source.trim() + ) + .into()); + } + run_checked( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["fetch", "--tags", "--force", "origin"]), + )?; + } else if checkout.exists() { + return Err(format!("{} exists but is not a git checkout", checkout.display()).into()); + } else { + run_checked( + Command::new("git") + .args(["clone", "--filter=blob:none", source]) + .arg(checkout), + )?; + } + + let _ = Command::new("git") + .arg("-C") + .arg(checkout) + .args(["fetch", "--tags", "--force", "origin", reference]) + .status(); + let _ = Command::new("git") + .arg("-C") + .arg(checkout) + .args(["fetch", "--force", "origin", commit]) + .status(); + run_checked( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["checkout", "--force", commit]), + ) +} + +fn validate_existing_lez_checkout(checkout: &Path, commit: &str) -> DynResult<()> { + if !checkout.exists() { + return Err(format!( + "{LEZ_REPO_ENV} must point at an existing LEZ checkout; unset it to use the managed cache" + ) + .into()); + } + + let is_git_checkout = command_output( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["rev-parse", "--is-inside-work-tree"]), + )?; + if is_git_checkout.trim() != "true" { + return Err(format!("{} is not a git checkout", checkout.display()).into()); + } + + let dirty = command_output(Command::new("git").arg("-C").arg(checkout).args([ + "status", + "--porcelain", + "--untracked-files=no", + ]))?; + if !dirty.trim().is_empty() { + return Err(format!( + "{} has uncommitted tracked changes; stash/commit them or unset {LEZ_REPO_ENV} to use the managed cache", + checkout.display() + ) + .into()); + } + + let head = command_output( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["rev-parse", "HEAD"]), + )?; + if head.trim() != commit { + return Err(format!( + "{} is at {}, but Cargo.lock pins {commit}; check out the pinned commit yourself or unset {LEZ_REPO_ENV} to use the managed cache", + checkout.display(), + head.trim() + ) + .into()); + } + + Ok(()) +} + +fn ensure_circuits(cache_root: &Path, lez_ref: &str) -> DynResult<()> { + if env::var_os(LEZ_SKIP_CIRCUITS_ENV).as_deref() == Some(OsStr::new("1")) { + return Ok(()); + } + + if let Some(path) = env::var_os(LOGOS_BLOCKCHAIN_CIRCUITS_ENV) { + let path = PathBuf::from(path); + if path.join(CIRCUITS_SENTINEL).is_file() { + println!("using {LOGOS_BLOCKCHAIN_CIRCUITS_ENV}={}", path.display()); + return Ok(()); + } + return Err(format!( + "{LOGOS_BLOCKCHAIN_CIRCUITS_ENV} is set but does not look populated: {}", + path.display() + ) + .into()); + } + + let version = env::var(LEZ_CIRCUITS_VERSION_ENV).unwrap_or_else(|_| { + infer_circuits_version(lez_ref) + .unwrap_or_default() + .to_owned() + }); + let version = version.trim_start_matches('v'); + if version.is_empty() { + return Err(format!( + "could not infer circuits version for {lez_ref}; set {LEZ_CIRCUITS_VERSION_ENV} or {LOGOS_BLOCKCHAIN_CIRCUITS_ENV}" + ) + .into()); + } + + let triple = release_triple()?; + let cache_dir = cache_root + .join("circuits") + .join(format!("v{version}-{triple}")); + if cache_dir.join(CIRCUITS_SENTINEL).is_file() + && cache_dir.join(CIRCUITS_READY_SENTINEL).is_file() + { + env::set_var(LOGOS_BLOCKCHAIN_CIRCUITS_ENV, &cache_dir); + println!( + "using cached {LOGOS_BLOCKCHAIN_CIRCUITS_ENV}={}", + cache_dir.display() + ); + return Ok(()); + } + + require_cmd("curl")?; + require_cmd("tar")?; + let circuits_parent = cache_dir + .parent() + .ok_or_else(|| format!("invalid circuits cache path: {}", cache_dir.display()))?; + fs::create_dir_all(circuits_parent)?; + let temp_cache_dir = circuits_parent.join(format!( + ".tmp-v{version}-{triple}-{}-{}", + process::id(), + unix_timestamp()? + )); + if temp_cache_dir.exists() { + fs::remove_dir_all(&temp_cache_dir)?; + } + fs::create_dir_all(&temp_cache_dir)?; + + let tarball = env::temp_dir().join(format!( + "lez-circuits-{}-{}.tar.gz", + process::id(), + unix_timestamp()? + )); + let url = format!( + "{CIRCUITS_RELEASE_BASE_URL}/v{version}/logos-blockchain-circuits-v{version}-{triple}.tar.gz" + ); + println!("downloading {url}"); + if let Err(err) = run_checked( + Command::new("curl") + .args([ + "--fail", + "--location", + "--show-error", + "--retry", + "3", + "--retry-delay", + "2", + "--output", + ]) + .arg(&tarball) + .arg(&url), + ) { + let _ = fs::remove_file(&tarball); + let _ = fs::remove_dir_all(&temp_cache_dir); + return Err(err); + } + if let Err(err) = run_checked( + Command::new("tar") + .arg("-xzf") + .arg(&tarball) + .arg("-C") + .arg(&temp_cache_dir) + .arg("--strip-components=1"), + ) { + let _ = fs::remove_file(&tarball); + let _ = fs::remove_dir_all(&temp_cache_dir); + return Err(err); + } + let _ = fs::remove_file(&tarball); + + if !temp_cache_dir.join(CIRCUITS_SENTINEL).is_file() { + let missing = temp_cache_dir.join(CIRCUITS_SENTINEL); + let _ = fs::remove_dir_all(&temp_cache_dir); + return Err(format!( + "circuits release extracted but {} is missing", + missing.display() + ) + .into()); + } + fs::write(temp_cache_dir.join(CIRCUITS_READY_SENTINEL), b"ok\n")?; + if cache_dir.exists() { + fs::remove_dir_all(&cache_dir)?; + } + fs::rename(&temp_cache_dir, &cache_dir)?; + + env::set_var(LOGOS_BLOCKCHAIN_CIRCUITS_ENV, &cache_dir); + println!("{LOGOS_BLOCKCHAIN_CIRCUITS_ENV}={}", cache_dir.display()); + Ok(()) +} + +fn infer_circuits_version(lez_ref: &str) -> Option<&'static str> { + // Keep this table in sync with the logos-execution-zone pin and the + // integration test harness schema guard for that checkout. + match lez_ref { + "v0.2.0-rc3" => Some("0.4.2"), + "v0.2.0-rc1" => Some("0.4.1"), + _ => None, + } +} + +fn release_triple() -> DynResult<&'static str> { + match (env::consts::OS, env::consts::ARCH) { + ("linux", "x86_64") => Ok("linux-x86_64"), + ("linux", "aarch64") => Ok("linux-aarch64"), + ("macos", "aarch64") => Ok("macos-aarch64"), + (os, arch) => Err(format!( + "unsupported circuits release platform: {os}/{arch}; set {LOGOS_BLOCKCHAIN_CIRCUITS_ENV} or {LEZ_SKIP_CIRCUITS_ENV}=1" + ) + .into()), + } +} + +fn build_sequencer(checkout: &Path, lez_ref: &str, commit: &str) -> DynResult<()> { + let head = command_output( + Command::new("git") + .arg("-C") + .arg(checkout) + .args(["rev-parse", "HEAD"]), + )?; + if head.trim() != commit { + return Err(format!( + "{} is at {}, but Cargo.lock pins {commit}; refusing to build mutable ref {lez_ref}", + checkout.display(), + head.trim() + ) + .into()); + } + + println!("building standalone sequencer_service from {lez_ref} ({commit})"); + run_checked( + Command::new("cargo") + .arg("build") + .arg("--manifest-path") + .arg(checkout.join("Cargo.toml")) + .arg("--release") + .arg("--locked") + .arg("--features") + .arg("standalone") + .arg("-p") + .arg("sequencer_service"), + ) +} + +fn run_integration_tests(checkout: &Path) -> DynResult<()> { + let repo_root = repo_root(); + let mut command = Command::new("cargo"); + command + .current_dir(&repo_root) + .env(LEZ_REPO_ENV, checkout) + .env( + "RISC0_DEV_MODE", + env::var("RISC0_DEV_MODE").unwrap_or_else(|_| "1".to_owned()), + ) + // Each TestState spawns its own sequencer process, so Cargo's default + // test parallelism multiplies sequencer/RISC0 load and RPC contention. + // Default to a single test thread for parity with CI; the caller can + // override by exporting RUST_TEST_THREADS. + .env( + "RUST_TEST_THREADS", + env::var("RUST_TEST_THREADS").unwrap_or_else(|_| "1".to_owned()), + ) + .args([ + "test", + "-p", + "integration_tests", + "--features", + "local-sequencer-tests", + "--", + "--nocapture", + ]); + run_checked(&mut command) +} + +fn print_ready(checkout: &Path) { + println!( + "local sequencer binary ready:\n {}\n\nRun integration tests:\n {LEZ_REPO_ENV}={} RISC0_DEV_MODE=1 RUST_TEST_THREADS=1 cargo test -p integration_tests --features local-sequencer-tests -- --nocapture", + checkout.join(SEQUENCER_BIN_REL_PATH).display(), + checkout.display() + ); +} + +fn require_cmd(cmd: &str) -> DynResult<()> { + let status = Command::new(cmd).arg("--version").status(); + match status { + Ok(_) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Err(format!("missing required command: {cmd}").into()) + } + Err(err) => Err(format!("failed to probe command `{cmd}`: {err}").into()), + } +} + +fn command_output(command: &mut Command) -> DynResult { + let output = command.output()?; + if !output.status.success() { + return Err(format!( + "command failed with {}: {}", + output.status, + render_command(command) + ) + .into()); + } + Ok(String::from_utf8(output.stdout)?) +} + +fn run_checked(command: &mut Command) -> DynResult<()> { + println!("$ {}", render_command(command)); + let status = command.status()?; + if !status.success() { + return Err(format!("command failed with {status}: {}", render_command(command)).into()); + } + Ok(()) +} + +fn render_command(command: &Command) -> String { + let mut output = command.get_program().to_string_lossy().to_string(); + for arg in command.get_args() { + output.push(' '); + output.push_str(&redact_command_arg(&arg.to_string_lossy())); + } + output +} + +fn redact_command_arg(arg: &str) -> String { + for scheme in ["https://", "http://"] { + let Some(rest) = arg.strip_prefix(scheme) else { + continue; + }; + let Some(at_position) = rest.find('@') else { + continue; + }; + if let Some(after_at) = rest + .get(at_position..) + .and_then(|suffix| suffix.strip_prefix('@')) + { + if !after_at.is_empty() { + return format!("{scheme}@{after_at}"); + } + } + } + arg.to_owned() +} + +fn unix_timestamp() -> DynResult { + Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()) +} From 5b5102e4da93c3cc57c27cf2c4046287d4a0ca94 Mon Sep 17 00:00:00 2001 From: Ricardo Guilherme Schmidt <3esmit@gmail.com> Date: Fri, 5 Jun 2026 15:25:26 -0300 Subject: [PATCH 2/2] fix(local-sequencer): avoid unsafe parsing and cache replacement --- .../integration_tests/src/local_sequencer.rs | 2 +- tools/local-sequencer/src/main.rs | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/programs/integration_tests/src/local_sequencer.rs b/programs/integration_tests/src/local_sequencer.rs index f8db4e3..ed08266 100644 --- a/programs/integration_tests/src/local_sequencer.rs +++ b/programs/integration_tests/src/local_sequencer.rs @@ -47,7 +47,7 @@ const COMMIT_TIMEOUT: Duration = Duration::from_secs(20); const SUBMIT_TIMEOUT: Duration = Duration::from_secs(5); const HEALTH_TIMEOUT: Duration = Duration::from_secs(30); const RPC_CALL_TIMEOUT: Duration = Duration::from_secs(10); -const MAX_RPC_RESPONSE_BYTES: usize = 16 * 1024 * 1024; +const MAX_RPC_RESPONSE_BYTES: usize = 16_777_216; // 16 MiB pub struct TestState { inner: nssa::V03State, diff --git a/tools/local-sequencer/src/main.rs b/tools/local-sequencer/src/main.rs index be37978..15d7ca5 100644 --- a/tools/local-sequencer/src/main.rs +++ b/tools/local-sequencer/src/main.rs @@ -202,7 +202,7 @@ fn parse_quoted_field(manifest: &Path, field: &str) -> Option { .find_map(|line| { let (_, value) = line.split_once(&marker)?; let value_end = value.find('"')?; - Some(value[..value_end].to_owned()) + Some(value.get(..value_end)?.to_owned()) }) } @@ -428,10 +428,27 @@ fn ensure_circuits(cache_root: &Path, lez_ref: &str) -> DynResult<()> { .into()); } fs::write(temp_cache_dir.join(CIRCUITS_READY_SENTINEL), b"ok\n")?; + let stale_cache_dir = circuits_parent.join(format!( + ".stale-v{version}-{triple}-{}-{}", + process::id(), + unix_timestamp()? + )); + if stale_cache_dir.exists() { + fs::remove_dir_all(&stale_cache_dir)?; + } if cache_dir.exists() { - fs::remove_dir_all(&cache_dir)?; + fs::rename(&cache_dir, &stale_cache_dir)?; + } + if let Err(err) = fs::rename(&temp_cache_dir, &cache_dir) { + if stale_cache_dir.exists() && !cache_dir.exists() { + let _ = fs::rename(&stale_cache_dir, &cache_dir); + } + let _ = fs::remove_dir_all(&temp_cache_dir); + return Err(err.into()); + } + if stale_cache_dir.exists() { + let _ = fs::remove_dir_all(&stale_cache_dir); } - fs::rename(&temp_cache_dir, &cache_dir)?; env::set_var(LOGOS_BLOCKCHAIN_CIRCUITS_ENV, &cache_dir); println!("{LOGOS_BLOCKCHAIN_CIRCUITS_ENV}={}", cache_dir.display());