diff --git a/Cargo.toml b/Cargo.toml index feb44c8b..d418fdf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.16.2" +version = "0.16.3" edition = "2021" rust-version = "1.87" authors = ["init4"] @@ -34,19 +34,19 @@ debug = false incremental = false [workspace.dependencies] -signet-bundle = { version = "0.16.2", path = "crates/bundle" } -signet-constants = { version = "0.16.2", path = "crates/constants" } -signet-evm = { version = "0.16.2", path = "crates/evm" } -signet-extract = { version = "0.16.2", path = "crates/extract" } -signet-journal = { version = "0.16.2", path = "crates/journal" } -signet-node = { version = "0.16.2", path = "crates/node" } -signet-orders = { version = "0.16.2", path = "crates/orders" } -signet-sim = { version = "0.16.2", path = "crates/sim" } -signet-types = { version = "0.16.2", path = "crates/types" } -signet-tx-cache = { version = "0.16.2", path = "crates/tx-cache" } -signet-zenith = { version = "0.16.2", path = "crates/zenith" } +signet-bundle = { version = "0.16.3", path = "crates/bundle" } +signet-constants = { version = "0.16.3", path = "crates/constants" } +signet-evm = { version = "0.16.3", path = "crates/evm" } +signet-extract = { version = "0.16.3", path = "crates/extract" } +signet-journal = { version = "0.16.3", path = "crates/journal" } +signet-node = { version = "0.16.3", path = "crates/node" } +signet-orders = { version = "0.16.3", path = "crates/orders" } +signet-sim = { version = "0.16.3", path = "crates/sim" } +signet-types = { version = "0.16.3", path = "crates/types" } +signet-tx-cache = { version = "0.16.3", path = "crates/tx-cache" } +signet-zenith = { version = "0.16.3", path = "crates/zenith" } -signet-test-utils = { version = "0.16.2", path = "crates/test-utils" } +signet-test-utils = { version = "0.16.3", path = "crates/test-utils" } # trevm trevm = { version = "0.34.2", features = ["full_env_cfg", "asyncdb"] } diff --git a/crates/orders/Cargo.toml b/crates/orders/Cargo.toml index cecd0eb3..f8789ea1 100644 --- a/crates/orders/Cargo.toml +++ b/crates/orders/Cargo.toml @@ -22,3 +22,6 @@ chrono.workspace = true futures-util.workspace = true thiserror.workspace = true tracing.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/orders/src/lib.rs b/crates/orders/src/lib.rs index 9a28e9dd..04081a48 100644 --- a/crates/orders/src/lib.rs +++ b/crates/orders/src/lib.rs @@ -30,5 +30,8 @@ pub use order_sender::{OrderSender, OrderSenderError}; mod preflight; pub use preflight::{Permit2Ext, PreflightError}; +pub mod stream; +pub use stream::OrderStreamExt; + mod traits; pub use traits::{BundleSubmitter, FillSubmitter, OrderSource, OrderSubmitter, TxBuilder}; diff --git a/crates/orders/src/stream/mod.rs b/crates/orders/src/stream/mod.rs new file mode 100644 index 00000000..2bd4710e --- /dev/null +++ b/crates/orders/src/stream/mod.rs @@ -0,0 +1,265 @@ +//! Stream combinators for filtering [`SignedOrder`] streams. +//! +//! [`OrderStreamExt`] adds `filter_orders` to any `Stream>`. +//! Errors flow through unchanged; only `Ok` items are tested against the predicate. + +use futures_util::{future, Stream, StreamExt}; +use signet_types::SignedOrder; + +pub mod predicates; + +/// Stream extension that filters [`SignedOrder`] items by predicate. +/// +/// `Err` items pass through unchanged; only `Ok(SignedOrder)` items are tested against the +/// predicate. Items where the predicate returns `false` are discarded. +pub trait OrderStreamExt: Sized { + /// The error type carried by the underlying stream's `Result` items. + type Error; + + /// Filter the stream by `predicate`. + /// + /// `predicate` is `FnMut`, so it can carry state across calls (e.g. for deduplication). Be + /// aware that any captured state lives for the lifetime of the returned stream - an unbounded + /// `HashSet` over a long-running stream will grow without bound, so prefer a bounded structure + /// (e.g. an LRU) for production use. + #[must_use = "filter_orders returns a new stream and does nothing unless polled"] + fn filter_orders(self, predicate: F) -> impl Stream> + where + F: FnMut(&SignedOrder) -> bool; +} + +impl OrderStreamExt for S +where + S: Stream>, +{ + type Error = E; + + fn filter_orders(self, mut predicate: F) -> impl Stream> + where + F: FnMut(&SignedOrder) -> bool, + { + self.filter(move |item| { + future::ready(match item { + Ok(order) => predicate(order), + Err(_) => true, + }) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{Address, Signature, U256}; + use core::convert::Infallible; + use futures_util::stream; + use signet_zenith::RollupOrders::{ + Output, Permit2Batch, PermitBatchTransferFrom, TokenPermissions, + }; + use std::collections::HashSet; + + pub(super) fn order_with( + owner: Address, + deadline: u64, + permitted: Vec, + outputs: Vec, + ) -> SignedOrder { + SignedOrder::new( + Permit2Batch { + permit: PermitBatchTransferFrom { + permitted, + nonce: U256::ZERO, + deadline: U256::from(deadline), + }, + owner, + signature: Signature::test_signature().as_bytes().into(), + }, + outputs, + ) + } + + #[tokio::test] + async fn filter_orders_drops_failing_predicate_and_passes_errors() { + let by_deadline = |deadline| order_with(Address::ZERO, deadline, vec![], vec![]); + let items: Vec> = vec![ + Ok(by_deadline(1)), + Err("boom"), + Ok(by_deadline(10)), + Err("bang"), + Ok(by_deadline(100)), + ]; + + let collected: Vec<_> = stream::iter(items) + .filter_orders(|order| order.permit().permit.deadline > U256::from(5)) + .collect() + .await; + + assert_eq!(collected.len(), 4); + assert_eq!(collected[0].as_ref().unwrap_err(), &"boom"); + assert_eq!(collected[1].as_ref().unwrap().permit().permit.deadline, U256::from(10)); + assert_eq!(collected[2].as_ref().unwrap_err(), &"bang"); + assert_eq!(collected[3].as_ref().unwrap().permit().permit.deadline, U256::from(100)); + } + + #[tokio::test] + async fn filter_orders_can_dedupe_with_stateful_predicate() { + let owner_a = Address::from([0xa1; 20]); + let owner_b = Address::from([0xb2; 20]); + let make = |owner| order_with(owner, 1, vec![], vec![]); + let items = [make(owner_a), make(owner_b), make(owner_a), make(owner_b)] + .into_iter() + .map(Ok::<_, Infallible>); + + let mut seen = HashSet::new(); + let collected: Vec<_> = stream::iter(items) + .filter_orders(move |order| seen.insert(*order.order_hash())) + .collect() + .await; + + assert_eq!(collected.len(), 2); + assert_eq!(collected[0].as_ref().unwrap(), &make(owner_a)); + assert_eq!(collected[1].as_ref().unwrap(), &make(owner_b)); + } + + #[tokio::test] + async fn filter_orders_works_with_predicate_helpers() { + let target = Address::from([0x42; 20]); + let other = Address::from([0x01; 20]); + + let matching = order_with( + other, + 1, + vec![TokenPermissions { token: target, amount: U256::ZERO }], + vec![], + ); + let non_matching = order_with( + other, + 1, + vec![TokenPermissions { token: other, amount: U256::ZERO }], + vec![], + ); + + let items: Vec> = vec![ + Ok(matching.clone()), + Err("error 1"), + Ok(non_matching), + Ok(matching.clone()), + Err("error 2"), + ]; + + let collected: Vec<_> = + stream::iter(items).filter_orders(predicates::has_input_token(target)).collect().await; + + assert_eq!(collected.len(), 4); + assert_eq!(collected[0].as_ref().unwrap(), &matching); + assert_eq!(collected[1].as_ref().unwrap_err(), &"error 1"); + assert_eq!(collected[2].as_ref().unwrap(), &matching); + assert_eq!(collected[3].as_ref().unwrap_err(), &"error 2"); + } + + #[tokio::test] + async fn filter_orders_composes_predicates() { + let chain_id = 17001u32; + let token = Address::from([0x42; 20]); + let other_token = Address::from([0x01; 20]); + + let on_chain = + |chain| Output { token, amount: U256::ZERO, recipient: Address::ZERO, chainId: chain }; + let with_input = |input| vec![TokenPermissions { token: input, amount: U256::ZERO }]; + + let matches_all = + order_with(Address::ZERO, 100, with_input(token), vec![on_chain(chain_id)]); + let wrong_input = + order_with(Address::ZERO, 100, with_input(other_token), vec![on_chain(chain_id)]); + let wrong_chain = order_with(Address::ZERO, 100, with_input(token), vec![on_chain(1)]); + let expired = order_with(Address::ZERO, 50, with_input(token), vec![on_chain(chain_id)]); + + let items = [matches_all.clone(), wrong_input, wrong_chain, expired] + .into_iter() + .map(Ok::<_, Infallible>); + + let alive = predicates::not_expired_at(|| 100); + let from_input = predicates::has_input_token(token); + let to_chain = predicates::has_output_chain(chain_id); + + let collected: Vec<_> = stream::iter(items) + .filter_orders(move |order| alive(order) && from_input(order) && to_chain(order)) + .collect() + .await; + + assert_eq!(collected.len(), 1); + assert_eq!(collected[0].as_ref().unwrap(), &matches_all); + } + + #[tokio::test] + async fn filter_orders_does_not_invoke_predicate_on_errors() { + let order = order_with(Address::ZERO, 1, vec![], vec![]); + let items: Vec> = + vec![Err("a"), Ok(order.clone()), Err("b"), Ok(order), Err("c")]; + + let mut calls = 0u32; + let collected: Vec<_> = stream::iter(items) + .filter_orders(|_| { + calls += 1; + true + }) + .collect() + .await; + + assert_eq!(calls, 2, "predicate should only run on Ok items"); + assert_eq!(collected.len(), 5, "all items should pass through when predicate returns true"); + assert_eq!(collected[0].as_ref().unwrap_err(), &"a"); + collected[1].as_ref().unwrap(); + assert_eq!(collected[2].as_ref().unwrap_err(), &"b"); + collected[3].as_ref().unwrap(); + assert_eq!(collected[4].as_ref().unwrap_err(), &"c"); + } + + #[tokio::test] + async fn filter_orders_handles_empty_stream() { + let items: Vec> = vec![]; + let collected: Vec<_> = stream::iter(items).filter_orders(|_| true).collect().await; + assert!(collected.is_empty()); + } + + #[tokio::test] + async fn filter_orders_handles_all_rejected() { + let items = (0..3u64).map(|deadline| { + Ok::<_, Infallible>(order_with(Address::ZERO, deadline, vec![], vec![])) + }); + let collected: Vec<_> = stream::iter(items).filter_orders(|_| false).collect().await; + assert!(collected.is_empty()); + } + + #[tokio::test] + async fn filter_orders_can_be_chained() { + let token_a = Address::from([0xaa; 20]); + let token_b = Address::from([0xbb; 20]); + let with_inputs = |tokens: Vec
| { + order_with( + Address::ZERO, + 1, + tokens + .into_iter() + .map(|t| TokenPermissions { token: t, amount: U256::ZERO }) + .collect(), + vec![], + ) + }; + + let only_a = with_inputs(vec![token_a]); + let both = with_inputs(vec![token_a, token_b]); + let only_b = with_inputs(vec![token_b]); + + let items = [only_a, both.clone(), only_b].into_iter().map(Ok::<_, Infallible>); + + let collected: Vec<_> = stream::iter(items) + .filter_orders(predicates::has_input_token(token_a)) + .filter_orders(predicates::has_input_token(token_b)) + .collect() + .await; + + assert_eq!(collected.len(), 1); + assert_eq!(collected[0].as_ref().unwrap(), &both); + } +} diff --git a/crates/orders/src/stream/predicates.rs b/crates/orders/src/stream/predicates.rs new file mode 100644 index 00000000..0feab311 --- /dev/null +++ b/crates/orders/src/stream/predicates.rs @@ -0,0 +1,220 @@ +//! Ready-made predicates for [`OrderStreamExt::filter_orders`]. +//! +//! Each function returns an `impl Fn(&SignedOrder) -> bool` that can be passed directly to +//! `filter_orders`; see that method for guidance on captured state in predicates. +//! +//! Predicates that scan a multi-element field (`has_output_*`, `has_input_token`) match if +//! *any* element satisfies the condition. An order whose outputs span multiple chains is +//! therefore retained when filtering by any one of those chain IDs, and likewise for +//! recipients. They are "has any matching element" filters, not "every element matches" +//! filters. `has_output_token` additionally requires the chain ID to match on the same +//! output, since an `Address` can refer to entirely different ERC20 contracts across chains. +//! +//! Compose them by capturing the predicates once and combining the results in a closure - e.g. +//! +//! ``` +//! # use alloy::primitives::Address; +//! # use futures_util::stream; +//! # use signet_orders::OrderStreamExt; +//! # use signet_orders::stream::predicates::{not_expired_at, has_input_token, has_output_token}; +//! # use signet_types::SignedOrder; +//! # let host_chain = 1u32; +//! # let usdc = Address::repeat_byte(0xaa); +//! # let weth = Address::repeat_byte(0xbb); +//! # let now = 1_700_000_000; +//! # let stream = stream::empty::>(); +//! let alive = not_expired_at(move || now); +//! let from_usdc = has_input_token(usdc); +//! let to_weth = has_output_token(host_chain, weth); +//! let _filtered = stream.filter_orders(move |order| { +//! alive(order) && from_usdc(order) && to_weth(order) +//! }); +//! ``` + +#[cfg(doc)] +use super::OrderStreamExt; +use alloy::primitives::Address; +use signet_types::SignedOrder; +#[cfg(doc)] +use signet_zenith::RollupOrders::Output; + +/// Match orders whose permit deadline is at or after `cutoff()` (i.e. not yet expired). +/// +/// `cutoff` is invoked once per `Ok` stream item (the predicate is not called on `Err`s), so it +/// can advance as the stream is consumed. The returned value is seconds since the unix epoch, +/// the same units as the permit `deadline`. Typically pass the earliest fill timestamp (e.g. +/// current time plus a block-lead allowance) so any order that cannot land before its deadline +/// is dropped. Boundary semantics are defined by [`SignedOrder::is_expired_at`]. +/// +/// For a snapshot/single-request flow with a static cutoff, pass `|| cutoff`. To advance a +/// counter across calls, use an interior-mutability source (e.g. `AtomicU64`). +pub fn not_expired_at(cutoff: impl Fn() -> u64) -> impl Fn(&SignedOrder) -> bool { + move |order| !order.is_expired_at(cutoff()) +} + +/// Match orders that have at least one [`Output`] targeting `chain_id`. +pub fn has_output_chain(chain_id: u32) -> impl Fn(&SignedOrder) -> bool { + move |order| order.outputs().iter().any(|output| output.chainId == chain_id) +} + +/// Match orders that have at least one [`Output`] paying `token` on `chain_id`. +/// +/// The chain ID is part of the match because the same `Address` can refer to entirely +/// different ERC20 contracts on host and rollup. +pub fn has_output_token(chain_id: u32, token: Address) -> impl Fn(&SignedOrder) -> bool { + move |order| { + order.outputs().iter().any(|output| output.chainId == chain_id && output.token == token) + } +} + +/// Match orders that have at least one [`Output`] going to `recipient`. +pub fn has_output_recipient(recipient: Address) -> impl Fn(&SignedOrder) -> bool { + move |order| order.outputs().iter().any(|output| output.recipient == recipient) +} + +/// Match orders whose permit2 batch permits `token` as an input. +pub fn has_input_token(token: Address) -> impl Fn(&SignedOrder) -> bool { + move |order| order.permit().permit.permitted.iter().any(|input| input.token == token) +} + +/// Match orders whose permit2 batch `owner` field equals `owner`. +pub fn with_owner(owner: Address) -> impl Fn(&SignedOrder) -> bool { + move |order| order.permit().owner == owner +} + +#[cfg(test)] +mod tests { + use super::{super::tests::order_with, *}; + use alloy::primitives::{Signature, U256}; + use core::sync::atomic::{AtomicU64, Ordering}; + use signet_zenith::RollupOrders::{ + Output, Permit2Batch, PermitBatchTransferFrom, TokenPermissions, + }; + use std::sync::Arc; + + #[test] + fn not_expired_at_matches_validate_boundary() { + let order = order_with(Address::ZERO, 100, vec![], vec![]); + assert!(not_expired_at(|| 99)(&order)); + assert!(not_expired_at(|| 100)(&order), "deadline equal to cutoff is still valid"); + assert!(!not_expired_at(|| 101)(&order)); + + // Cross-check against `SignedOrder::validate` to lock in the matching boundary. + order.validate(99).unwrap(); + order.validate(100).unwrap(); + order.validate(101).unwrap_err(); + } + + #[test] + fn not_expired_at_re_evaluates_each_call() { + let order = order_with(Address::ZERO, 100, vec![], vec![]); + let now = Arc::new(AtomicU64::new(99)); + let predicate = { + let now = Arc::clone(&now); + not_expired_at(move || now.load(Ordering::Relaxed)) + }; + + // Cutoff 99: 100 >= 99 -> alive. + assert!(predicate(&order)); + + // Advance the clock past the deadline; the same predicate must observe the new value. + now.store(101, Ordering::Relaxed); + assert!(!predicate(&order)); + } + + #[test] + fn not_expired_at_saturates_u256_deadline_above_u64_max() { + // A deadline that overflows u64 must saturate to u64::MAX, so the order is always + // considered alive against any u64 cutoff. This mirrors `SignedOrder::validate`, which + // uses `saturating_to::()`. + let order = SignedOrder::new( + Permit2Batch { + permit: PermitBatchTransferFrom { + permitted: vec![], + nonce: U256::ZERO, + deadline: U256::MAX, + }, + owner: Address::ZERO, + signature: Signature::test_signature().as_bytes().into(), + }, + vec![], + ); + + assert!(not_expired_at(|| 0)(&order)); + assert!(not_expired_at(|| u64::MAX)(&order)); + order.validate(0).unwrap(); + order.validate(u64::MAX).unwrap(); + } + + #[test] + fn output_predicates_match_any_output() { + let token_a = Address::from([0xaa; 20]); + let token_b = Address::from([0xbb; 20]); + let recipient = Address::from([0xcc; 20]); + let order = order_with( + Address::ZERO, + 1, + vec![], + vec![ + Output { token: token_a, amount: U256::ZERO, recipient, chainId: 17001 }, + Output { token: token_b, amount: U256::ZERO, recipient: Address::ZERO, chainId: 1 }, + ], + ); + + assert!(has_output_chain(17001)(&order)); + assert!(has_output_chain(1)(&order)); + assert!(!has_output_chain(2)(&order)); + + assert!(has_output_token(17001, token_a)(&order)); + assert!(has_output_token(1, token_b)(&order)); + assert!(!has_output_token(1, token_a)(&order), "right token, wrong chain"); + assert!(!has_output_token(17001, token_b)(&order), "right token, wrong chain"); + assert!(!has_output_token(17001, Address::ZERO)(&order)); + + assert!(has_output_recipient(recipient)(&order)); + assert!(has_output_recipient(Address::ZERO)(&order)); + assert!(!has_output_recipient(Address::from([0xde; 20]))(&order)); + } + + #[test] + fn input_token_matches_any_permitted_input() { + let token_a = Address::from([0xaa; 20]); + let token_b = Address::from([0xbb; 20]); + let order = order_with( + Address::ZERO, + 1, + vec![ + TokenPermissions { token: token_a, amount: U256::ZERO }, + TokenPermissions { token: token_b, amount: U256::ZERO }, + ], + vec![], + ); + + assert!(has_input_token(token_a)(&order)); + assert!(has_input_token(token_b)(&order)); + assert!(!has_input_token(Address::ZERO)(&order)); + } + + #[test] + fn with_owner_matches_permit_owner() { + let owner = Address::from([0x11; 20]); + let order = order_with(owner, 1, vec![], vec![]); + assert!(with_owner(owner)(&order)); + assert!(!with_owner(Address::ZERO)(&order)); + } + + #[test] + fn output_predicates_reject_order_with_no_outputs() { + let order = order_with(Address::ZERO, 1, vec![], vec![]); + assert!(!has_output_chain(0)(&order)); + assert!(!has_output_chain(1)(&order)); + assert!(!has_output_token(0, Address::ZERO)(&order)); + assert!(!has_output_recipient(Address::ZERO)(&order)); + } + + #[test] + fn input_token_rejects_order_with_no_permitted_inputs() { + let order = order_with(Address::ZERO, 1, vec![], vec![]); + assert!(!has_input_token(Address::ZERO)(&order)); + } +} diff --git a/crates/types/src/signing/order.rs b/crates/types/src/signing/order.rs index 417bc7ee..9b8699ef 100644 --- a/crates/types/src/signing/order.rs +++ b/crates/types/src/signing/order.rs @@ -62,13 +62,22 @@ impl SignedOrder { (self.permit, self.outputs) } + /// Returns whether the order is expired as of `timestamp`. + /// + /// An order is expired when `timestamp` is strictly greater than the permit deadline; a + /// `timestamp` equal to the deadline is **not** expired. A `U256` deadline above `u64::MAX` + /// saturates to `u64::MAX`, so such an order is never expired against any `u64` timestamp. + pub fn is_expired_at(&self, timestamp: u64) -> bool { + timestamp > self.permit.permit.deadline.saturating_to::() + } + /// Check that this can be syntactically used to initiate an order. /// /// For it to be valid: /// - Deadline must be in the future. pub fn validate(&self, timestamp: u64) -> Result<(), SignedPermitError> { - let deadline = self.permit.permit.deadline.saturating_to::(); - if timestamp > deadline { + if self.is_expired_at(timestamp) { + let deadline = self.permit.permit.deadline.saturating_to::(); return Err(SignedPermitError::DeadlinePassed { current: timestamp, deadline }); }