diff --git a/Cargo.lock b/Cargo.lock index 3c961e3..5862290 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3154,6 +3154,7 @@ dependencies = [ "cucumber", "indexmap 1.9.3", "log", + "minotari", "minotari_app_grpc 5.3.0-pre.3 (git+https://github.com/tari-project/tari/)", "minotari_app_utilities", "minotari_node", diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index 5e4d554..f1a9802 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -35,6 +35,10 @@ tari_comms_dht = { git = "https://github.com/tari-project/tari/"} tari_p2p = { git = "https://github.com/tari-project/tari/"} tari_shutdown = { git = "https://github.com/tari-project/tari/" } +# Minotari wallet library (for fast sync tests) +minotari = { path = "../minotari" } + + # Other dependencies rand = "0.8" tonic = "0.13" @@ -52,4 +56,7 @@ serde_json = "1.0" reqwest = { version = "0.12", features = ["json"] } log = "0.4" chrono = "0.4" -rusqlite = { version = "0.37", features = ["bundled"] } \ No newline at end of file +rusqlite = { version = "0.37", features = ["bundled"] } + +[package.metadata.cargo-machete] +ignored = ["minotari"] \ No newline at end of file diff --git a/integration-tests/features/fast_sync.feature b/integration-tests/features/fast_sync.feature new file mode 100644 index 0000000..a372e2c --- /dev/null +++ b/integration-tests/features/fast_sync.feature @@ -0,0 +1,93 @@ +Feature: Fast Sync Scanning + As a wallet user + I want to quickly sync my wallet using fast sync + So that I can see my balance faster than a normal full scan + + # ============================= + # Performance Comparisons + # ============================= + Scenario: Fast sync without backfill is faster than normal sync when there are spent outputs + Given I have a seed node MinerNode + And I have a test database with a full signing wallet + When I mine 20 blocks on MinerNode + And I perform a normal full scan + And I send 15 transactions + And I mine 100 blocks on MinerNode + And I reset the wallet database keeping account + And I measure the time for a normal full scan + And I reset the wallet database keeping account + And I measure the time for a fast sync without backfill + Then the fast sync should be faster than the normal scan + + Scenario: Fast sync without backfill completes in similar time to normal sync with no spent outputs + Given I have a seed node MinerNode + And I have a test database with an existing wallet + When I mine 100 blocks on MinerNode + And I measure the time for a normal full scan + And I reset the wallet database + And I measure the time for a fast sync without backfill + Then the fast sync and normal scan should complete in similar time + + Scenario: Fast sync with backfill completes within reasonable time of normal sync + Given I have a seed node MinerNode + And I have a test database with a full signing wallet + When I mine 10 blocks on MinerNode + And I perform a normal full scan + And I send 1 transactions + And I mine 100 blocks on MinerNode + And I reset the wallet database keeping account + And I measure the time for a normal full scan + And I reset the wallet database keeping account + And I measure the time for a fast sync with backfill + Then I print the fast sync benchmark results + + # ============================= + # Balance Correctness - No Transactions + # ============================= + + + Scenario: Fast sync without backfill shows correct balance with no transactions + Given I have a seed node MinerNode + And I have a test database with an existing wallet + When I mine 20 blocks on MinerNode to a different address + And I perform a fast sync without backfill + Then the fast sync should complete successfully + And the fast sync balance should be zero + + Scenario: Fast sync with backfill shows correct balance with no transactions + Given I have a seed node MinerNode + And I have a test database with an existing wallet + When I mine 20 blocks on MinerNode to a different address + And I perform a fast sync without backfill + And I perform a backfill scan + Then the fast sync should complete successfully + And the fast sync balance should be zero + + # ============================= + # Balance Correctness - With Transactions + # ============================= + + Scenario: Fast sync without backfill shows correct balance with transactions + Given I have a seed node MinerNode + And I have a test database with a full signing wallet + When I mine 10 blocks on MinerNode + And I perform a normal full scan + And I send 1 transactions + And I mine 10 blocks on MinerNode + And I reset the wallet database keeping account + And I perform a fast sync without backfill + Then the fast sync should complete successfully + And the fast sync balance should be at least 1 microTari + + Scenario: Fast sync with backfill shows correct balance with transactions + Given I have a seed node MinerNode + And I have a test database with a full signing wallet + When I mine 10 blocks on MinerNode + And I perform a normal full scan + And I send 1 transactions + And I mine 10 blocks on MinerNode + And I reset the wallet database keeping account + And I perform a fast sync without backfill + And I perform a backfill scan + Then the fast sync should complete successfully + And the fast sync balance should be at least 1 microTari diff --git a/integration-tests/steps/fast_sync.rs b/integration-tests/steps/fast_sync.rs new file mode 100644 index 0000000..c3d1be5 --- /dev/null +++ b/integration-tests/steps/fast_sync.rs @@ -0,0 +1,398 @@ +// Fast Sync Step Definitions +// +// Step definitions for testing fast sync scanning functionality, +// including performance comparisons and balance correctness. + +use cucumber::{then, when}; +use minotari::db; +use minotari::scan::{ScanMode, Scanner}; +use std::time::Instant; +use tari_common::configuration::Network::LocalNet; +use tari_common_types::tari_address::{TariAddress, TariAddressFeatures}; +use tari_transaction_components::key_manager::wallet_types::WalletType; + +use super::common::MinotariWorld; + +/// Safety buffer used for fast sync tests. Small enough to exercise the +/// fast sync phases even with short test chains. +const TEST_FAST_SYNC_SAFETY_BUFFER: u64 = 5; + +// ============================= +// Scan Execution Steps +// ============================= + +#[when("I perform a fast sync without backfill")] +async fn perform_fast_sync_no_backfill(world: &mut MinotariWorld) { + let db_path = world.database_path.as_ref().expect("Database not set up"); + let base_url = get_base_url(world); + + let scanner = Scanner::new(&world.test_password, &base_url, db_path.clone(), 120, 3) + .mode(ScanMode::FastSync { + safety_buffer: TEST_FAST_SYNC_SAFETY_BUFFER, + }) + .account("default"); + + let result = scanner.run().await; + match &result { + Ok((events, _)) => { + world.last_command_exit_code = Some(0); + world.last_command_output = Some(format!("Fast sync completed with {} events", events.len())); + println!("Fast sync completed: {} events", events.len()); + }, + Err(e) => { + world.last_command_exit_code = Some(1); + world.last_command_error = Some(format!("{}", e)); + println!("Fast sync failed: {}", e); + }, + } +} + +#[when("I perform a backfill scan")] +async fn perform_backfill(world: &mut MinotariWorld) { + let db_path = world.database_path.as_ref().expect("Database not set up"); + let base_url = get_base_url(world); + + // Get the current tip to determine backfill_to_height + let tip_client = minotari::http::WalletHttpClient::new(base_url.parse().unwrap()).unwrap(); + let tip = tip_client.get_tip_info().await.expect("Failed to get tip"); + let tip_height = tip.metadata.as_ref().map(|m| m.best_block_height()).unwrap_or(0); + let backfill_to = tip_height.saturating_sub(TEST_FAST_SYNC_SAFETY_BUFFER); + + let scanner = Scanner::new(&world.test_password, &base_url, db_path.clone(), 100, 3).account("default"); + + let result = scanner.run_backfill(backfill_to).await; + match &result { + Ok((events, _)) => { + world.last_command_exit_code = Some(0); + world.last_command_output = Some(format!("Backfill completed with {} events", events.len())); + println!("Backfill completed: {} events", events.len()); + }, + Err(e) => { + world.last_command_exit_code = Some(1); + world.last_command_error = Some(format!("{}", e)); + println!("Backfill failed: {}", e); + }, + } +} + +#[when("I perform a normal full scan")] +async fn perform_normal_full_scan(world: &mut MinotariWorld) { + let db_path = world.database_path.as_ref().expect("Database not set up"); + let base_url = get_base_url(world); + + let scanner = Scanner::new(&world.test_password, &base_url, db_path.clone(), 100, 3) + .mode(ScanMode::Full) + .account("default"); + + let result = scanner.run().await; + match &result { + Ok((events, _)) => { + world.last_command_exit_code = Some(0); + world.last_command_output = Some(format!("Normal scan completed with {} events", events.len())); + println!("Normal scan completed: {} events", events.len()); + }, + Err(e) => { + world.last_command_exit_code = Some(1); + world.last_command_error = Some(format!("{}", e)); + println!("Normal scan failed: {}", e); + }, + } +} + +// ============================= +// Mining to a different address (no wallet outputs) +// ============================= + +#[when(expr = "I mine {int} blocks on {word} to a different address")] +async fn mine_blocks_to_different_address(world: &mut MinotariWorld, num_blocks: u64, node_name: String) { + let node = world + .base_nodes + .get(&node_name) + .unwrap_or_else(|| panic!("Node {} not found", node_name)); + + // Create a completely separate wallet address for mining rewards + // so the test wallet receives no funds + let other_wallet = WalletType::new_random().expect("Failed to create random wallet"); + let other_address = TariAddress::new_dual_address( + other_wallet.get_public_view_key(), + other_wallet.get_public_spend_key(), + LocalNet, + TariAddressFeatures::create_one_sided_only(), + None, + ) + .unwrap(); + + node.mine_blocks(num_blocks, &other_address) + .await + .expect("Failed to mine blocks"); + + let height = node.get_tip_height().await.expect("Failed to get tip height"); + println!( + "Mined {} blocks on {} to different address, height: {}", + num_blocks, node_name, height + ); +} + +// ============================= +// Performance Measurement Steps +// ============================= + +#[when("I measure the time for a normal full scan")] +async fn measure_normal_scan(world: &mut MinotariWorld) { + let start = Instant::now(); + perform_normal_full_scan(world).await; + let duration = start.elapsed(); + + println!("Normal full scan completed in {:?}", duration); + world.benchmark_timings.insert("normal_scan".to_string(), duration); +} + +#[when("I measure the time for a fast sync without backfill")] +async fn measure_fast_sync_no_backfill(world: &mut MinotariWorld) { + let start = Instant::now(); + perform_fast_sync_no_backfill(world).await; + let duration = start.elapsed(); + + println!("Fast sync (no backfill) completed in {:?}", duration); + world.benchmark_timings.insert("fast_sync".to_string(), duration); +} + +#[when("I measure the time for a fast sync with backfill")] +async fn measure_fast_sync_with_backfill(world: &mut MinotariWorld) { + let start = Instant::now(); + perform_fast_sync_no_backfill(world).await; + perform_backfill(world).await; + let duration = start.elapsed(); + + println!("Fast sync + backfill completed in {:?}", duration); + world + .benchmark_timings + .insert("fast_sync_with_backfill".to_string(), duration); +} + +// ============================= +// Database Reset Steps +// ============================= + +#[when("I reset the wallet database")] +async fn reset_wallet_database(world: &mut MinotariWorld) { + let db_path = world.database_path.as_ref().expect("Database not set up").clone(); + + // Delete and recreate the database, then re-import the wallet + std::fs::remove_file(&db_path).ok(); + + let (cmd, mut args) = world.get_minotari_command(); + args.extend_from_slice(&[ + "import-view-key".to_string(), + "--view-private-key".to_string(), + tari_utilities::hex::Hex::to_hex(world.wallet.get_view_key()), + "--spend-public-key".to_string(), + tari_utilities::hex::Hex::to_hex(&world.wallet.get_public_spend_key()), + "--password".to_string(), + world.test_password.clone(), + "--database-path".to_string(), + db_path.to_str().unwrap().to_string(), + ]); + + let output = std::process::Command::new(&cmd) + .args(&args) + .output() + .expect("Failed to re-import wallet"); + + assert!(output.status.success(), "Failed to reset wallet database"); + println!("Wallet database reset and wallet re-imported"); +} + +#[when("I reset the wallet database keeping account")] +async fn reset_wallet_database_keeping_account(world: &mut MinotariWorld) { + let db_path = world.database_path.as_ref().expect("Database not set up").clone(); + + // Delete scanning state but keep the account + let pool = db::init_db(db_path).expect("Failed to init db"); + let conn = pool.get().expect("Failed to get connection"); + + // Clear wallet state in FK-safe order: + // balance_changes references outputs and inputs + // inputs references outputs + // completed_transactions references pending_transactions + conn.execute_batch( + "DELETE FROM displayed_transactions; + DELETE FROM completed_transactions; + DELETE FROM balance_changes; + DELETE FROM events; + DELETE FROM inputs; + DELETE FROM outputs; + DELETE FROM scanned_tip_blocks; + DELETE FROM pending_transactions; + DELETE FROM burn_proofs;", + ) + .expect("Failed to clear wallet state"); + + println!("Wallet state cleared (account kept)"); +} + +// ============================= +// Assertion Steps +// ============================= + +#[then("the fast sync should complete successfully")] +async fn fast_sync_succeeds(world: &mut MinotariWorld) { + assert_eq!( + world.last_command_exit_code, + Some(0), + "Fast sync/backfill failed: {}", + world.last_command_error.as_deref().unwrap_or("unknown error") + ); +} + +#[then("the fast sync balance should be zero")] +async fn fast_sync_balance_is_zero(world: &mut MinotariWorld) { + let balance = world.fetch_balance(); + println!("Balance after fast sync: {} µT", balance); + assert_eq!(balance, 0, "Expected zero balance after fast sync, got {}", balance); +} + +#[then(regex = r"^the fast sync balance should be at least (\d+) microTari$")] +async fn fast_sync_balance_at_least(world: &mut MinotariWorld, minimum: u64) { + let balance = world.fetch_balance(); + println!( + "Balance after fast sync: {} µT (expected at least {} µT)", + balance, minimum + ); + + // Also print DB diagnostics for debugging + if let Some(db_path) = &world.database_path + && let Ok(pool) = db::init_db(db_path.clone()) + && let Ok(conn) = pool.get() + { + let accounts = db::get_accounts(&conn, Some("default")).unwrap_or_default(); + if let Some(account) = accounts.first() { + if let Ok(bal) = db::get_balance(&conn, account.id) { + println!( + " DB balance detail: total={}, available={}, locked={}, unconfirmed={}, credits={:?}, debits={:?}", + bal.total, bal.available, bal.locked, bal.unconfirmed, bal.total_credits, bal.total_debits + ); + } + + // Count outputs by status + let output_counts: String = conn + .query_row( + "SELECT GROUP_CONCAT(status || ':' || cnt) FROM (SELECT status, COUNT(*) as cnt FROM outputs WHERE account_id = ?1 AND deleted_at IS NULL GROUP BY status)", + [account.id], + |row| row.get(0), + ) + .unwrap_or_else(|_| "error".to_string()); + println!(" Output counts by status: {}", output_counts); + } + } + + assert!( + balance >= minimum, + "Expected balance at least {} microTari after fast sync, got {}", + minimum, + balance + ); +} + +#[then("the fast sync and normal scan should complete in similar time")] +async fn fast_sync_similar_time(world: &mut MinotariWorld) { + let normal = world + .benchmark_timings + .get("normal_scan") + .expect("Normal scan timing not recorded"); + let fast = world + .benchmark_timings + .get("fast_sync") + .expect("Fast sync timing not recorded"); + + println!("Normal scan: {:?}", normal); + println!("Fast sync: {:?}", fast); + + // With no spent outputs, both should take roughly the same time. + // Allow fast sync to be up to 3x slower due to phase overhead on small chains. + let ratio = fast.as_secs_f64() / normal.as_secs_f64().max(0.001); + println!("Ratio (fast/normal): {:.2}x", ratio); + assert!( + ratio < 3.0, + "Fast sync ({:?}) took more than 3x longer than normal scan ({:?}) with no spent outputs", + fast, + normal + ); +} + +#[then("the fast sync should be faster than the normal scan")] +async fn fast_sync_is_faster(world: &mut MinotariWorld) { + let normal = world + .benchmark_timings + .get("normal_scan") + .expect("Normal scan timing not recorded"); + let fast = world + .benchmark_timings + .get("fast_sync") + .expect("Fast sync timing not recorded"); + + println!("Normal scan: {:?}", normal); + println!("Fast sync: {:?}", fast); + println!( + "Speedup: {:.1}x", + normal.as_secs_f64() / fast.as_secs_f64().max(0.001) + ); + + assert!( + fast <= normal, + "Fast sync ({:?}) should be faster than or equal to normal scan ({:?})", + fast, + normal + ); +} + +#[then("I print the fast sync benchmark results")] +async fn print_fast_sync_benchmarks(world: &mut MinotariWorld) { + println!("\n========================================"); + println!(" FAST SYNC BENCHMARK RESULTS"); + println!("========================================"); + + if let Some(normal) = world.benchmark_timings.get("normal_scan") { + println!(" Normal scan: {:?}", normal); + } + if let Some(fast) = world.benchmark_timings.get("fast_sync") { + println!(" Fast sync (no backfill): {:?}", fast); + } + if let Some(fast_bf) = world.benchmark_timings.get("fast_sync_with_backfill") { + println!(" Fast sync + backfill: {:?}", fast_bf); + } + + if let (Some(normal), Some(fast)) = ( + world.benchmark_timings.get("normal_scan"), + world.benchmark_timings.get("fast_sync"), + ) { + println!( + " Speedup (no backfill): {:.1}x", + normal.as_secs_f64() / fast.as_secs_f64().max(0.001) + ); + } + + if let (Some(normal), Some(fast_bf)) = ( + world.benchmark_timings.get("normal_scan"), + world.benchmark_timings.get("fast_sync_with_backfill"), + ) { + println!( + " Speedup (with backfill): {:.1}x", + normal.as_secs_f64() / fast_bf.as_secs_f64().max(0.001) + ); + } + + println!("========================================\n"); +} + +// ============================= +// Helpers +// ============================= + +fn get_base_url(world: &MinotariWorld) -> String { + if let Some((_, node)) = world.base_nodes.iter().next() { + format!("http://127.0.0.1:{}", node.http_port) + } else { + panic!("No base node available for scanning"); + } +} diff --git a/integration-tests/steps/mod.rs b/integration-tests/steps/mod.rs index 7ba5e5a..0802650 100644 --- a/integration-tests/steps/mod.rs +++ b/integration-tests/steps/mod.rs @@ -8,6 +8,7 @@ pub mod base_node; pub mod burn; pub mod common; pub mod daemon; +pub mod fast_sync; pub mod fund_locking; pub mod load_testing; pub mod scanning; diff --git a/minotari-scanning/src/scanning/grpc/scanner.rs b/minotari-scanning/src/scanning/grpc/scanner.rs index 6966965..3b6f5cd 100644 --- a/minotari-scanning/src/scanning/grpc/scanner.rs +++ b/minotari-scanning/src/scanning/grpc/scanner.rs @@ -147,6 +147,7 @@ where batch_size: Some(100), request_timeout: self.timeout, exclude_spent: false, + exclude_inputs: false, }) } @@ -387,6 +388,7 @@ where batch_size: config.batch_size, request_timeout: config.request_timeout, exclude_spent: config.exclude_spent, + exclude_inputs: config.exclude_inputs, }; self.current_in_progress = InProgressScan::new(adjusted_config); return Ok(()); @@ -443,6 +445,7 @@ where .num_threads(self.number_processing_threads) .build() .map_err(|e| WalletError::ConfigurationError(format!("Failed to build thread pool: {e}")))?; + let config = config.clone(); tokio::spawn(async move { loop { let grpc_block_response = stream.message().await; @@ -495,12 +498,17 @@ where } }); }); - let inputs = tari_block - .body - .inputs() - .iter() - .map(tari_transaction_components::transaction_components::TransactionInput::output_hash) - .collect(); + + let inputs = if config.exclude_inputs { + Vec::new() + } else { + tari_block + .body + .inputs() + .iter() + .map(tari_transaction_components::transaction_components::TransactionInput::output_hash) + .collect() + }; let block_res = BlockScanResult { height: tari_block.header.height, diff --git a/minotari-scanning/src/scanning/http/scanner.rs b/minotari-scanning/src/scanning/http/scanner.rs index 4d6dcc9..c507ac7 100644 --- a/minotari-scanning/src/scanning/http/scanner.rs +++ b/minotari-scanning/src/scanning/http/scanner.rs @@ -135,6 +135,7 @@ where limit: u64, page: u64, exclude_spent: bool, + exclude_inputs: bool, ) -> WalletResult { let mut timeout_retries = 0; let mut error_retries = 0; @@ -142,7 +143,7 @@ where loop { match timeout( self.timeout, - self.sync_utxos_by_block_http_call(start_header_hash, limit, page, exclude_spent), + self.sync_utxos_by_block_http_call(start_header_hash, limit, page, exclude_spent, exclude_inputs), ) .await { @@ -190,6 +191,7 @@ where limit: u64, page: u64, exclude_spent: bool, + exclude_inputs: bool, ) -> WalletResult { let url = format!("{}/sync_utxos_by_block", self.base_url); let version = 1; @@ -202,6 +204,7 @@ where ("page", &page.to_string()), ("version", &version.to_string()), ("exclude_spent", &exclude_spent.to_string()), + ("exclude_inputs", &exclude_inputs.to_string()), ]) .send() .await @@ -325,6 +328,7 @@ where batch_size: Some(50), request_timeout: self.timeout, exclude_spent: false, + exclude_inputs: false, }) } @@ -351,6 +355,7 @@ where async fn fetch_block_range(&mut self) -> WalletResult<(Vec, bool)> { let start_height = self.current_in_progress.get_config().map_or(0, |c| c.start_height); let exclude_spent = self.current_in_progress.get_config().is_some_and(|c| c.exclude_spent); + let include_inputs = self.current_in_progress.get_config().is_some_and(|c| !c.exclude_inputs); // Get the starting header hash let mut more_blocks = true; @@ -379,7 +384,7 @@ where .unwrap_or(SYNC_UTXOS_BY_BLOCK_PAGE_LIMIT); let page = self.current_in_progress.page(); let sync_response = self - .sync_utxos_by_block(¤t_header_hash, limit, page, exclude_spent) + .sync_utxos_by_block(¤t_header_hash, limit, page, exclude_spent, include_inputs) .await?; if sync_response.blocks.is_empty() { debug!("No more blocks available from base node"); diff --git a/minotari-scanning/src/scanning/mod.rs b/minotari-scanning/src/scanning/mod.rs index e080047..7d9d94b 100644 --- a/minotari-scanning/src/scanning/mod.rs +++ b/minotari-scanning/src/scanning/mod.rs @@ -52,6 +52,7 @@ pub struct ScanConfig { #[serde(with = "duration_serde")] pub request_timeout: Duration, pub exclude_spent: bool, + pub exclude_inputs: bool, } impl Default for ScanConfig { @@ -62,6 +63,7 @@ impl Default for ScanConfig { batch_size: Some(100), request_timeout: Duration::from_secs(30), exclude_spent: false, + exclude_inputs: false, } } } @@ -91,6 +93,18 @@ impl ScanConfig { self.batch_size = Some(batch_size); self } + + #[must_use] + pub const fn with_exclude_spent(mut self, exclude_spent: bool) -> Self { + self.exclude_spent = exclude_spent; + self + } + + #[must_use] + pub const fn with_exclude_inputs(mut self, exclude_inputs: bool) -> Self { + self.exclude_inputs = exclude_inputs; + self + } } // Helper module for Duration serialization diff --git a/minotari/src/db/balance_changes.rs b/minotari/src/db/balance_changes.rs index 5f21858..07eec27 100644 --- a/minotari/src/db/balance_changes.rs +++ b/minotari/src/db/balance_changes.rs @@ -88,6 +88,26 @@ pub fn insert_balance_change(conn: &Connection, change: &BalanceChange) -> Walle Ok(id) } +/// Inserts a balance change only if one does not already exist for the given output or input. +/// Used during backfill to avoid duplicate balance entries (balance_changes has no unique constraint). +/// Returns `Some(id)` if inserted, `None` if a matching record already existed. +pub fn insert_balance_change_if_not_exists(conn: &Connection, change: &BalanceChange) -> WalletDbResult> { + // Check by output_id or input_id to see if this balance change was already recorded + if let Some(output_id) = change.caused_by_output_id + && get_balance_change_id_by_output(conn, output_id)?.is_some() + { + return Ok(None); + } + if let Some(input_id) = change.caused_by_input_id + && get_balance_change_id_by_input(conn, input_id)?.is_some() + { + return Ok(None); + } + + let id = insert_balance_change(conn, change)?; + Ok(Some(id)) +} + pub fn get_all_balance_changes_by_account_id(conn: &Connection, account_id: i64) -> WalletDbResult> { debug!( account_id = account_id; diff --git a/minotari/src/db/mod.rs b/minotari/src/db/mod.rs index 61a77f4..f6773d7 100644 --- a/minotari/src/db/mod.rs +++ b/minotari/src/db/mod.rs @@ -73,11 +73,13 @@ pub use scanned_tip_blocks::{ mod outputs; pub use outputs::{ - DbOutput, DbWalletOutput, fetch_outputs_by_lock_request_id, fetch_unspent_outputs, get_active_outputs_from_height, - get_output_by_id, get_output_info_by_hash, get_output_info_by_hash_for_account, get_output_totals_for_account, - get_total_unspent_balance, get_unconfirmed_outputs, insert_output, lock_output, mark_output_confirmed, - soft_delete_outputs_from_height, unlock_outputs_for_request, - unlock_outputs_for_request as unlock_outputs_for_pending_transaction, update_output_status, + DbOutput, DbWalletOutput, UnresolvedSpentOutput, fetch_outputs_by_lock_request_id, fetch_unspent_outputs, + get_active_outputs_from_height, get_output_by_id, get_output_info_by_hash, get_output_info_by_hash_for_account, + get_output_totals_for_account, get_total_unspent_balance, get_unconfirmed_outputs, + get_unresolved_spent_unconfirmed_outputs, insert_output, insert_spent_output_if_not_exists, lock_output, + mark_output_confirmed, resolve_spent_unconfirmed_with_inputs, soft_delete_outputs_from_height, + unlock_outputs_for_request, unlock_outputs_for_request as unlock_outputs_for_pending_transaction, + update_output_status, }; mod pending_transactions; @@ -106,7 +108,9 @@ pub use webhooks::{delete_webhooks_older_than, enqueue_webhook, fetch_due_webhoo mod balance_changes; pub use balance_changes::{ - get_all_active_balance_changes_by_account_id, get_all_balance_changes_by_account_id, insert_balance_change, + get_all_active_balance_changes_by_account_id, get_all_balance_changes_by_account_id, + get_balance_change_id_by_input, get_balance_change_id_by_output, insert_balance_change, + insert_balance_change_if_not_exists, }; mod inputs; diff --git a/minotari/src/db/outputs.rs b/minotari/src/db/outputs.rs index 74273e7..0575df7 100644 --- a/minotari/src/db/outputs.rs +++ b/minotari/src/db/outputs.rs @@ -106,6 +106,95 @@ pub fn insert_output( Ok(conn.last_insert_rowid()) } +/// Inserts an output with `SpentUnconfirmed` status using `INSERT OR IGNORE`. +/// Used during backfill to avoid duplicate outputs in a single atomic statement. +/// Returns `Some(id)` if inserted, `None` if the output already existed. +#[allow(clippy::too_many_arguments)] +pub fn insert_spent_output_if_not_exists( + conn: &Connection, + account_id: i64, + account_view_key: &PrivateKey, + output_hash: Vec, + output: &WalletOutput, + block_height: u64, + block_hash: &FixedHash, + mined_timestamp: u64, + memo_parsed: Option, + memo_hex: Option, + payment_reference: PaymentReference, + is_burn: bool, +) -> WalletDbResult> { + let tx_id = TxId::new_deterministic(account_view_key.as_bytes(), &output.output_hash()).as_i64_wrapped(); + let output_json = serde_json::to_string(&output)?; + + #[allow(clippy::cast_possible_wrap)] + let mined_timestamp_dt = DateTime::::from_timestamp(mined_timestamp as i64, 0) + .ok_or_else(|| WalletDbError::Decoding(format!("Invalid mined timestamp: {}", mined_timestamp)))?; + + #[allow(clippy::cast_possible_wrap)] + let block_height = block_height as i64; + #[allow(clippy::cast_possible_wrap)] + let value = output.value().as_u64() as i64; + let payment_reference_hex = hex::encode(payment_reference.as_slice()); + let status = OutputStatus::SpentUnconfirmed.to_string(); + + conn.execute( + r#" + INSERT OR IGNORE INTO outputs ( + account_id, + tx_id, + output_hash, + mined_in_block_height, + mined_in_block_hash, + value, + mined_timestamp, + wallet_output_json, + memo_parsed, + memo_hex, + payment_reference, + is_burn, + status + ) + VALUES ( + :account_id, + :tx_id, + :output_hash, + :block_height, + :block_hash, + :value, + :mined_timestamp, + :output_json, + :memo_parsed, + :memo_hex, + :payment_reference, + :is_burn, + :status + ) + "#, + named_params! { + ":account_id": account_id, + ":tx_id": tx_id, + ":output_hash": output_hash, + ":block_height": block_height, + ":block_hash": block_hash.as_slice(), + ":value": value, + ":mined_timestamp": mined_timestamp_dt, + ":output_json": output_json, + ":memo_parsed": memo_parsed, + ":memo_hex": memo_hex, + ":payment_reference": payment_reference_hex, + ":is_burn": is_burn, + ":status": status, + }, + )?; + + if conn.changes() == 0 { + Ok(None) + } else { + Ok(Some(conn.last_insert_rowid())) + } +} + pub fn get_output_info_by_hash( conn: &Connection, output_hash: &FixedHash, @@ -311,6 +400,90 @@ pub fn soft_delete_outputs_from_height(conn: &Connection, account_id: i64, heigh Ok(()) } +/// Marks `SpentUnconfirmed` outputs that have a matching input as `Spent`. +/// Returns the number of outputs marked as spent. +pub fn resolve_spent_unconfirmed_with_inputs(conn: &Connection, account_id: i64) -> WalletDbResult { + let spent_unconfirmed = OutputStatus::SpentUnconfirmed.to_string(); + let spent = OutputStatus::Spent.to_string(); + + let marked_spent = conn.execute( + r#" + UPDATE outputs + SET status = :spent_status + WHERE account_id = :account_id + AND status = :spent_unconfirmed_status + AND deleted_at IS NULL + AND id IN ( + SELECT output_id FROM inputs WHERE deleted_at IS NULL + ) + "#, + named_params! { + ":spent_status": spent, + ":account_id": account_id, + ":spent_unconfirmed_status": spent_unconfirmed, + }, + )? as u64; + + info!( + target: "audit", + account_id = account_id, + marked_spent = marked_spent; + "Resolved SpentUnconfirmed outputs with matching inputs" + ); + + Ok(marked_spent) +} + +/// Returned data for an unresolved SpentUnconfirmed output. +pub struct UnresolvedSpentOutput { + pub output_id: i64, + pub output_hash: Vec, + pub mined_in_block_height: u64, + pub value: u64, +} + +/// Returns info for remaining `SpentUnconfirmed` outputs that have no matching input. +/// These need to be verified against the base node. +pub fn get_unresolved_spent_unconfirmed_outputs( + conn: &Connection, + account_id: i64, +) -> WalletDbResult> { + let spent_unconfirmed = OutputStatus::SpentUnconfirmed.to_string(); + + let mut stmt = conn.prepare_cached( + r#" + SELECT id, output_hash, mined_in_block_height, value + FROM outputs + WHERE account_id = :account_id + AND status = :status + AND deleted_at IS NULL + ORDER BY mined_in_block_height + "#, + )?; + + let rows = stmt.query_map( + named_params! { + ":account_id": account_id, + ":status": spent_unconfirmed, + }, + |row| { + Ok(UnresolvedSpentOutput { + output_id: row.get(0)?, + output_hash: row.get(1)?, + mined_in_block_height: row.get::<_, i64>(2).map(|h| h as u64)?, + value: row.get::<_, i64>(3).map(|v| v as u64)?, + }) + }, + )?; + + let mut results = Vec::new(); + for row in rows { + results.push(row?); + } + + Ok(results) +} + pub fn update_output_status(conn: &Connection, output_id: i64, status: OutputStatus) -> WalletDbResult<()> { debug!( output_id = output_id, diff --git a/minotari/src/http/wallet_http_client.rs b/minotari/src/http/wallet_http_client.rs index 03f8fba..c923075 100644 --- a/minotari/src/http/wallet_http_client.rs +++ b/minotari/src/http/wallet_http_client.rs @@ -36,7 +36,8 @@ use log::{debug, info, warn}; use reqwest::Method; use tari_transaction_components::MicroMinotari; use tari_transaction_components::rpc::models::{ - FeePerGramStat, TipInfoResponse, TxQueryResponse, TxSubmissionResponse, + FeePerGramStat, GetUtxosDeletedInfoRequest, GetUtxosDeletedInfoResponse, TipInfoResponse, TxQueryResponse, + TxSubmissionResponse, }; use tari_transaction_components::transaction_components::Transaction; use tari_utilities::hex::to_hex; @@ -512,4 +513,32 @@ impl WalletHttpClient { Ok(stats) } + + /// Queries the base node for the spent/unspent status of specific UTXOs. + /// + /// For each output hash, the response indicates whether the output was found on chain + /// and whether it has been spent. Used during fast sync to verify `SpentUnconfirmed` outputs. + pub async fn get_utxos_deleted_info( + &self, + output_hashes: Vec>, + ) -> Result { + debug!( + count = output_hashes.len(); + "HTTP: Querying deleted info for UTXOs" + ); + + let request = GetUtxosDeletedInfoRequest { + hashes: output_hashes, + must_include_header: Vec::new(), + }; + + let body = serde_json::to_value(request)?; + + let response = self + .http_client + .send_request(Method::POST, "/get_utxos_deleted_info", Some(body)) + .await?; + + Ok(response) + } } diff --git a/minotari/src/models/output_status.rs b/minotari/src/models/output_status.rs index 921fbc0..fb7572b 100644 --- a/minotari/src/models/output_status.rs +++ b/minotari/src/models/output_status.rs @@ -7,6 +7,10 @@ pub enum OutputStatus { Unspent, Locked, Spent, + /// Output discovered during fast sync backfill that is known to be spent + /// but whose spending input has not yet been processed. Once the backfill + /// processes the corresponding input, the status transitions to `Spent`. + SpentUnconfirmed, } impl std::fmt::Display for OutputStatus { @@ -15,6 +19,7 @@ impl std::fmt::Display for OutputStatus { OutputStatus::Unspent => write!(f, "UNSPENT"), OutputStatus::Locked => write!(f, "LOCKED"), OutputStatus::Spent => write!(f, "SPENT"), + OutputStatus::SpentUnconfirmed => write!(f, "SPENT_UNCONFIRMED"), } } } @@ -27,6 +32,7 @@ impl FromStr for OutputStatus { "UNSPENT" => Ok(OutputStatus::Unspent), "LOCKED" => Ok(OutputStatus::Locked), "SPENT" => Ok(OutputStatus::Spent), + "SPENT_UNCONFIRMED" => Ok(OutputStatus::SpentUnconfirmed), _ => Err(format!("Invalid OutputStatus: {}", s)), } } diff --git a/minotari/src/scan/block_processor.rs b/minotari/src/scan/block_processor.rs index 2a73216..3fdb409 100644 --- a/minotari/src/scan/block_processor.rs +++ b/minotari/src/scan/block_processor.rs @@ -94,8 +94,11 @@ pub struct BlockProcessor { has_pending_outbound: bool, /// Required confirmations required_confirmations: u64, - /// Webhook Configuration + /// Webhook Configuration webhook_config: Option, + /// When true, uses OR IGNORE inserts for outputs/inputs/balance_changes + /// to skip already-existing records during history backfill (Phase 3 of FastSync). + backfill_mode: bool, } impl BlockProcessor { @@ -120,6 +123,7 @@ impl BlockProcessor { has_pending_outbound: false, required_confirmations, webhook_config: None, + backfill_mode: false, } } } @@ -153,6 +157,7 @@ impl BlockProcessor { has_pending_outbound, required_confirmations, webhook_config: None, + backfill_mode: false, } } @@ -181,6 +186,12 @@ impl BlockProcessor { self.webhook_config = config; } + /// Enables or disables backfill mode for history backfill (Phase 3 of FastSync). + /// In backfill mode, outputs and inputs use OR IGNORE inserts to skip duplicates. + pub fn set_backfill_mode(&mut self, value: bool) { + self.backfill_mode = value; + } + /// Processes a single scanned block. /// /// This is the main entry point for block processing. It performs all @@ -327,6 +338,46 @@ impl BlockProcessor { let memo = MemoInfo::from_output(output); + // Compute the payment reference from block hash and output hash + let payment_reference = generate_payment_reference(&block.block_hash, hash); + + let is_burn = output.is_burned(); + + let output_id = if self.backfill_mode { + match db::insert_spent_output_if_not_exists( + tx, + account_id, + account_view_key, + hash.to_vec(), + output, + block.height, + &block.block_hash, + block.mined_timestamp, + memo.parsed.clone(), + memo.hex.clone(), + payment_reference, + is_burn, + )? { + Some(id) => id, + None => continue, // Output already exists, skip events and balance changes + } + } else { + db::insert_output( + tx, + account_id, + account_view_key, + hash.to_vec(), + output, + block.height, + &block.block_hash, + block.mined_timestamp, + memo.parsed.clone(), + memo.hex.clone(), + payment_reference, + is_burn, + )? + }; + info!( target: "audit", account_id = account_id, @@ -339,26 +390,6 @@ impl BlockProcessor { let event = self.make_output_detected_event(account_id, *hash, block, &memo); self.wallet_events.push(event.clone()); - // Compute the payment reference from block hash and output hash - let payment_reference = generate_payment_reference(&block.block_hash, hash); - - let is_burn = output.is_burned(); - - let output_id = db::insert_output( - tx, - account_id, - account_view_key, - hash.to_vec(), - output, - block.height, - &block.block_hash, - block.mined_timestamp, - memo.parsed.clone(), - memo.hex.clone(), - payment_reference, - is_burn, - )?; - let event_id = db::insert_wallet_event(tx, account_id, &event)?; generated_events.push((event_id, event)); @@ -418,7 +449,11 @@ impl BlockProcessor { ) -> Result { let change = make_balance_change_for_output(account_id, output_id, block.mined_timestamp, block.height, output); - db::insert_balance_change(tx, &change)?; + if self.backfill_mode { + db::insert_balance_change_if_not_exists(tx, &change)?; + } else { + db::insert_balance_change(tx, &change)?; + } Ok(change) } @@ -513,7 +548,11 @@ impl BlockProcessor { is_reversed: false, }; - db::insert_balance_change(tx, &change)?; + if self.backfill_mode { + db::insert_balance_change_if_not_exists(tx, &change)?; + } else { + db::insert_balance_change(tx, &change)?; + } Ok(change) } diff --git a/minotari/src/scan/builder.rs b/minotari/src/scan/builder.rs index 7e9db49..654619b 100644 --- a/minotari/src/scan/builder.rs +++ b/minotari/src/scan/builder.rs @@ -348,4 +348,64 @@ impl Scanner { let future = self.run_internal(event_sender); (rx, future) } + + /// Runs the history backfill phase independently. + /// + /// This scans from each account's birthday to `backfill_to_height` using full + /// block processing with idempotent inserts. Newly discovered outputs are marked + /// `SpentUnconfirmed` and then verified against the base node. + /// + /// Call this after a fast sync has completed and the wallet is already usable. + /// The backfill fills in complete transaction history (spent outputs, inputs, + /// balance changes) for the period that was fast-synced. + pub async fn run_backfill(self, backfill_to_height: u64) -> Result<(Vec, bool), ScanError> { + self.run_backfill_internal(backfill_to_height, NoopEventSender).await + } + + /// Runs the history backfill with real-time event streaming. + #[allow(clippy::type_complexity)] + pub fn run_backfill_with_events( + self, + backfill_to_height: u64, + ) -> ( + mpsc::UnboundedReceiver, + impl std::future::Future, bool), ScanError>>, + ) { + let (tx, rx) = mpsc::unbounded_channel(); + let event_sender = ChannelEventSender::new(tx); + let future = self.run_backfill_internal(backfill_to_height, event_sender); + (rx, future) + } + + async fn run_backfill_internal( + self, + backfill_to_height: u64, + event_sender: E, + ) -> Result<(Vec, bool), ScanError> { + let pool = db::init_db(self.database_file.clone())?; + let conn = pool.get().map_err(|e| ScanError::DbError(e.into()))?; + + let accounts = db::get_accounts(&conn, self.account_name.as_deref())?; + let coordinator = ScanCoordinator::new( + pool, + self.base_url, + event_sender, + self.retry_config, + self.required_confirmations, + self.webhook_config, + self.processing_threads, + self.reorg_check_interval, + self.batch_size, + )?; + + coordinator + .run_backfill( + accounts, + &self.password, + backfill_to_height, + self.scanning_offset, + self.cancel_token, + ) + .await + } } diff --git a/minotari/src/scan/config.rs b/minotari/src/scan/config.rs index 4810191..214a218 100644 --- a/minotari/src/scan/config.rs +++ b/minotari/src/scan/config.rs @@ -17,6 +17,10 @@ pub const DEFAULT_SCANNING_OFFSET_DAYS: u64 = 2; pub const OPTIMAL_SCANNING_THREADS: usize = 0; // Based on num_cpus +/// Default safety buffer for fast sync mode (approximately 24 hours of blocks). +/// Blocks within this distance from the tip are scanned fully rather than fast-synced. +pub const DEFAULT_FAST_SYNC_SAFETY_BUFFER: u64 = 720; + /// Configuration for scan operation timeouts. /// /// This is a simplified configuration struct for controlling timeout behavior. @@ -98,6 +102,24 @@ pub enum ScanMode { /// Duration to wait between scan cycles after reaching chain tip. poll_interval: Duration, }, + + /// Three-phase fast synchronization mode. + /// + /// Optimized for wallets that are far behind the chain tip: + /// + /// 1. **Fast UTXO scan**: Scans from birthday to `tip - safety_buffer` asking + /// the base node for unspent UTXOs only (`exclude_spent=true`). This skips + /// already-spent outputs for much faster initial sync. + /// 2. **Recent full scan**: Full scan from `tip - safety_buffer` to tip with + /// complete output and input processing. + /// 3. **History backfill**: Full scan from birthday to `tip - safety_buffer` + /// to fill in complete transaction history (spent outputs, inputs, balance + /// changes). Uses idempotent inserts to skip already-recorded data. + FastSync { + /// Number of blocks from chain tip that define the boundary between + /// fast scanning and full scanning. Defaults to [`DEFAULT_FAST_SYNC_SAFETY_BUFFER`] (720). + safety_buffer: u64, + }, } /// Comprehensive configuration for scan retry behavior. diff --git a/minotari/src/scan/coordinator.rs b/minotari/src/scan/coordinator.rs index 6d08678..655ddf4 100644 --- a/minotari/src/scan/coordinator.rs +++ b/minotari/src/scan/coordinator.rs @@ -1,4 +1,4 @@ -use log::info; +use log::{debug, info}; use minotari_scanning::{HttpBlockchainScanner, scanning::BlockchainScanner}; use std::{collections::VecDeque, sync::Arc}; use tari_common_types::{seeds::cipher_seed::BIRTHDAY_GENESIS_FROM_UNIX_EPOCH, types::PrivateKey}; @@ -9,12 +9,12 @@ use crate::{ PauseReason, ScanStatusEvent, db::{AccountRow, SqlitePool}, http::WalletHttpClient, - models::WalletEvent, + models::{OutputStatus, WalletEvent}, scan::{ DisplayedTransactionsEvent, ReorgDetectedEvent, ScanError, ScanMode, ScanRetryConfig, TransactionsUpdatedEvent, block_processor::BlockProcessor, config::OPTIMAL_SCANNING_THREADS, - events::{EventSender, ProcessingEvent}, + events::{EventSender, FastSyncPhase, ProcessingEvent}, reorg, scan_db_handler::ScanDbHandler, scanner_state_manager::ScannerStateManager, @@ -23,6 +23,7 @@ use crate::{ webhooks::WebhookTriggerConfig, }; use tari_transaction_components::key_manager::TransactionKeyManagerInterface; +use tari_utilities::hex::Hex; const MAX_CONTINUOUS_BUFFERED_EVENTS: usize = 10_000; @@ -119,7 +120,10 @@ impl ScanCoordinator { sync_targets.push(target); } - self.unified_scan_loop(sync_targets, mode, cancel_token).await + match mode { + ScanMode::FastSync { safety_buffer } => self.run_fast_sync(sync_targets, safety_buffer, cancel_token).await, + _ => self.unified_scan_loop(sync_targets, mode, cancel_token).await, + } } /// Prepares a scan context for an account. @@ -249,7 +253,6 @@ impl ScanCoordinator { let mut max_new_height_in_batch = 0; while let Some(response) = utxo_stream.recv().await { let scanned_blocks = response.map_err(|e| ScanError::Intermittent(e.to_string()))?; - //let (scanned_blocks, mut more_blocks) = self.scan_blocks_with_timeout(scanner, &scanner_config).await?; let new_blocks_count = scanned_blocks.len() as u64; is_batch_empty = scanned_blocks.is_empty(); // Go on, if we stopped on artificial horizon @@ -328,6 +331,12 @@ impl ScanCoordinator { self.check_global_reorgs(&mut targets, &db_handler, scanner).await?; blocks_since_reorg_check = 0; } + + if let Some(token) = &cancel_token + && token.is_cancelled() + { + return self.handle_pause(&targets, PauseReason::Cancelled, all_events); + } } if let ScanMode::Partial { max_blocks } = mode && total_scanned_globally >= max_blocks @@ -480,6 +489,583 @@ impl ScanCoordinator { })); } + /// Runs the three-phase fast sync strategy. + #[allow(clippy::too_many_lines)] + async fn run_fast_sync( + &self, + mut targets: Vec, + safety_buffer: u64, + cancel_token: Option, + ) -> Result<(Vec, bool), ScanError> { + let mut all_events = Vec::new(); + + // Get chain tip to compute fast_sync_target + let tip_response = self.client.get_tip_info().await.map_err(ScanError::Fatal)?; + let tip_height = tip_response + .metadata + .as_ref() + .map(|m| m.best_block_height()) + .unwrap_or(0); + let fast_sync_target = tip_height.saturating_sub(safety_buffer); + + info!( + tip = tip_height, + fast_sync_target = fast_sync_target, + safety_buffer = safety_buffer; + "Starting fast sync" + ); + + // --- Phase 1: Fast UTXO scan (birthday -> fast_sync_target, exclude_spent=true) --- + for target in &targets { + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseStarted { + account_id: target.account.id, + phase: FastSyncPhase::FastUtxoScan, + from_height: target.next_block_to_scan, + to_height: Some(fast_sync_target), + })); + } + + let (events, _) = self + .fast_sync_loop(&mut targets, fast_sync_target, &cancel_token) + .await?; + all_events.extend(events); + + for target in &targets { + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseCompleted { + account_id: target.account.id, + phase: FastSyncPhase::FastUtxoScan, + })); + } + + // Check cancellation between phases + if let Some(token) = &cancel_token + && token.is_cancelled() + { + return Ok((all_events, true)); + } + + // --- Phase 2: Recent full scan (fast_sync_target -> tip) --- + // Targets already have next_block_to_scan set to fast_sync_target + 1 + // from Phase 1, so no reset needed. + for target in &targets { + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseStarted { + account_id: target.account.id, + phase: FastSyncPhase::RecentFullScan, + from_height: fast_sync_target, + to_height: None, + })); + } + + // Phase 2 uses the same simple loop pattern as fast_sync_loop but without + // exclude_spent, effectively doing a full scan of recent blocks. + let (events, _) = self.recent_full_scan_loop(&mut targets, &cancel_token).await?; + all_events.extend(events); + + for target in &targets { + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseCompleted { + account_id: target.account.id, + phase: FastSyncPhase::RecentFullScan, + })); + } + + info!("Fast sync completed (wallet is now usable; run backfill separately for full history)"); + Ok((all_events, false)) + } + + /// Runs the history backfill phase separately. + /// + /// Scans from each account's birthday to `backfill_to_height` with full block + /// processing (outputs + inputs) using backfill mode (OR IGNORE for duplicates, + /// SpentUnconfirmed status for new outputs). After scanning, resolves + /// SpentUnconfirmed outputs by checking inputs and verifying against the base node. + pub async fn run_backfill( + &self, + accounts: Vec, + password: &str, + backfill_to_height: u64, + scanning_offset: u64, + cancel_token: Option, + ) -> Result<(Vec, bool), ScanError> { + if accounts.is_empty() { + return Ok((Vec::new(), false)); + } + + let mut conn = self.pool.get().map_err(|e| ScanError::DbError(e.into()))?; + + let first_km = accounts + .first() + .expect("is already checked") + .get_key_manager(password)?; + let mut shared_reorg_scanner = self.create_reorg_scanner(first_km).await?; + + let mut targets = Vec::with_capacity(accounts.len()); + for account in accounts { + let target = self + .prepare_target( + account, + password, + &self.client, + scanning_offset, + &mut conn, + &mut shared_reorg_scanner, + ) + .await?; + targets.push(target); + } + + let mut all_events = Vec::new(); + + for target in &targets { + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseStarted { + account_id: target.account.id, + phase: FastSyncPhase::HistoryBackfill, + from_height: target.next_block_to_scan, + to_height: Some(backfill_to_height), + })); + } + + let (events, _) = self + .backfill_loop(&mut targets, backfill_to_height, &cancel_token) + .await?; + all_events.extend(events); + + // Resolve SpentUnconfirmed outputs: + // 1. Mark outputs with matching inputs as Spent + // 2. Verify remaining outputs against the base node + let conn = self.pool.get().map_err(|e| ScanError::DbError(e.into()))?; + for target in &targets { + let marked_spent = crate::db::resolve_spent_unconfirmed_with_inputs(&conn, target.account.id) + .map_err(ScanError::DbError)?; + info!( + account_id = target.account.id, + marked_spent = marked_spent; + "Resolved SpentUnconfirmed outputs with matching inputs" + ); + + let unresolved = crate::db::get_unresolved_spent_unconfirmed_outputs(&conn, target.account.id) + .map_err(ScanError::DbError)?; + + if !unresolved.is_empty() { + info!( + account_id = target.account.id, + count = unresolved.len(); + "Verifying remaining SpentUnconfirmed outputs against base node" + ); + self.verify_spent_unconfirmed_outputs(&conn, target, &unresolved) + .await?; + } + + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::FastSyncPhaseCompleted { + account_id: target.account.id, + phase: FastSyncPhase::HistoryBackfill, + })); + } + + info!("History backfill completed"); + Ok((all_events, false)) + } + + /// Phase 1: Fast sync loop scanning only unspent outputs. + async fn fast_sync_loop( + &self, + targets: &mut [AccountSyncTarget], + end_height: u64, + cancel_token: &Option, + ) -> Result<(Vec, bool), ScanError> { + let mut all_events = Vec::new(); + let mut state_manager = ScannerStateManager::new(); + let mut total_scanned = 0u64; + + let all_accounts: Vec<(i64, PrivateKey)> = targets + .iter() + .map(|target| (target.account.id, target.view_key.clone())) + .collect(); + let block_processor = BlockProcessor::with_event_sender( + all_accounts, + self.event_sender.clone(), + false, + self.required_confirmations, + ); + let mut db_handler = ScanDbHandler::new(self.pool.clone(), block_processor); + + let global_next_block = targets.iter().map(|t| t.next_block_to_scan).min().unwrap_or(0); + let active_account_ids: Vec = targets + .iter() + .filter(|t| t.next_block_to_scan <= end_height) + .map(|t| t.account.id) + .collect(); + + if active_account_ids.is_empty() || global_next_block > end_height { + return Ok((all_events, false)); + } + + let (scanner, mut scanner_config) = state_manager + .get_scanner_and_config( + &active_account_ids, + global_next_block, + Some(end_height), + self.batch_size, + targets, + &self.base_url, + self.processing_threads, + &self.retry_config, + ) + .await?; + + // Set exclude_spent and exclude_inputs for fast sync + scanner_config.exclude_spent = true; + scanner_config.exclude_inputs = true; + + let mut utxo_stream = scanner + .scan_blocks(&scanner_config) + .await + .map_err(|e| ScanError::Intermittent(e.to_string()))?; + + while let Some(response) = utxo_stream.recv().await { + if let Some(token) = cancel_token + && token.is_cancelled() + { + break; + } + + let scanned_blocks = response.map_err(|e| ScanError::Intermittent(e.to_string()))?; + if scanned_blocks.is_empty() { + break; + } + total_scanned += scanned_blocks.len() as u64; + + let shared_blocks = Arc::new(scanned_blocks); + + for account_id in &active_account_ids { + let Some(target) = targets.iter_mut().find(|t| t.account.id == *account_id) else { + continue; + }; + + let events = db_handler + .process_blocks( + shared_blocks.clone(), + *account_id, + false, + self.webhook_config.clone(), + target.next_block_to_scan, + ) + .await?; + all_events.extend(events); + + if let Some(last) = shared_blocks.last() { + target.next_block_to_scan = last.height + 1; + + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::Progress { + account_id: target.account.id, + current_height: last.height, + blocks_scanned: total_scanned, + })); + } + } + } + + Ok((all_events, false)) + } + + /// Phase 2: Full scan of recent blocks from fast_sync_target to tip. + async fn recent_full_scan_loop( + &self, + targets: &mut [AccountSyncTarget], + cancel_token: &Option, + ) -> Result<(Vec, bool), ScanError> { + let mut all_events = Vec::new(); + let mut state_manager = ScannerStateManager::new(); + let mut total_scanned = 0u64; + + let all_accounts: Vec<(i64, PrivateKey)> = targets + .iter() + .map(|target| (target.account.id, target.view_key.clone())) + .collect(); + let block_processor = BlockProcessor::with_event_sender( + all_accounts, + self.event_sender.clone(), + false, + self.required_confirmations, + ); + let mut db_handler = ScanDbHandler::new(self.pool.clone(), block_processor); + + let global_next_block = targets.iter().map(|t| t.next_block_to_scan).min().unwrap_or(0); + let active_account_ids: Vec = targets.iter().map(|t| t.account.id).collect(); + + let (scanner, scanner_config) = state_manager + .get_scanner_and_config( + &active_account_ids, + global_next_block, + None, // scan to tip + self.batch_size, + targets, + &self.base_url, + self.processing_threads, + &self.retry_config, + ) + .await?; + + let mut utxo_stream = scanner + .scan_blocks(&scanner_config) + .await + .map_err(|e| ScanError::Intermittent(e.to_string()))?; + + while let Some(response) = utxo_stream.recv().await { + if let Some(token) = cancel_token + && token.is_cancelled() + { + break; + } + + let scanned_blocks = response.map_err(|e| ScanError::Intermittent(e.to_string()))?; + if scanned_blocks.is_empty() { + break; + } + total_scanned += scanned_blocks.len() as u64; + + let shared_blocks = Arc::new(scanned_blocks); + + for account_id in &active_account_ids { + let Some(target) = targets.iter_mut().find(|t| t.account.id == *account_id) else { + continue; + }; + + let events = db_handler + .process_blocks( + shared_blocks.clone(), + *account_id, + target.transaction_monitor.has_pending_outbound(), + self.webhook_config.clone(), + target.next_block_to_scan, + ) + .await?; + all_events.extend(events); + + if let Some(last) = shared_blocks.last() { + target.next_block_to_scan = last.height + 1; + + let monitor_res = target + .transaction_monitor + .monitor_if_needed(&self.client, &self.pool, target.account.id, last.height) + .await + .map_err(ScanError::Fatal)?; + + all_events.extend(monitor_res.wallet_events.clone()); + self.emit_monitor_events(target.account.id, last.height, monitor_res); + + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::Progress { + account_id: target.account.id, + current_height: last.height, + blocks_scanned: total_scanned, + })); + } + } + } + + Ok((all_events, false)) + } + + /// Phase 3: Backfill loop scanning full blocks with OR IGNORE for existing data. + async fn backfill_loop( + &self, + targets: &mut [AccountSyncTarget], + end_height: u64, + cancel_token: &Option, + ) -> Result<(Vec, bool), ScanError> { + let mut all_events = Vec::new(); + let mut state_manager = ScannerStateManager::new(); + let mut total_scanned = 0u64; + + let all_accounts: Vec<(i64, PrivateKey)> = targets + .iter() + .map(|target| (target.account.id, target.view_key.clone())) + .collect(); + let mut block_processor = BlockProcessor::with_event_sender( + all_accounts, + self.event_sender.clone(), + false, + self.required_confirmations, + ); + block_processor.set_backfill_mode(true); + let mut db_handler = ScanDbHandler::new(self.pool.clone(), block_processor); + + let global_next_block = targets.iter().map(|t| t.next_block_to_scan).min().unwrap_or(0); + let active_account_ids: Vec = targets + .iter() + .filter(|t| t.next_block_to_scan <= end_height) + .map(|t| t.account.id) + .collect(); + + if active_account_ids.is_empty() || global_next_block > end_height { + return Ok((all_events, false)); + } + + let (scanner, scanner_config) = state_manager + .get_scanner_and_config( + &active_account_ids, + global_next_block, + Some(end_height), + self.batch_size, + targets, + &self.base_url, + self.processing_threads, + &self.retry_config, + ) + .await?; + + let mut utxo_stream = scanner + .scan_blocks(&scanner_config) + .await + .map_err(|e| ScanError::Intermittent(e.to_string()))?; + + while let Some(response) = utxo_stream.recv().await { + if let Some(token) = cancel_token + && token.is_cancelled() + { + break; + } + + let scanned_blocks = response.map_err(|e| ScanError::Intermittent(e.to_string()))?; + if scanned_blocks.is_empty() { + break; + } + total_scanned += scanned_blocks.len() as u64; + + let shared_blocks = Arc::new(scanned_blocks); + + for account_id in &active_account_ids { + let Some(target) = targets.iter_mut().find(|t| t.account.id == *account_id) else { + continue; + }; + + let events = db_handler + .process_blocks( + shared_blocks.clone(), + *account_id, + false, + self.webhook_config.clone(), + target.next_block_to_scan, + ) + .await?; + all_events.extend(events); + + if let Some(last) = shared_blocks.last() { + target.next_block_to_scan = last.height + 1; + + self.event_sender + .send(ProcessingEvent::ScanStatus(ScanStatusEvent::Progress { + account_id: target.account.id, + current_height: last.height, + blocks_scanned: total_scanned, + })); + } + } + } + + Ok((all_events, false)) + } + + /// Verifies remaining SpentUnconfirmed outputs against the base node using + /// the `get_utxos_deleted_info` endpoint. If `spent_in_header` is present, + /// the output is confirmed Spent and an input record + debit balance change + /// are created so the balance sums correctly. Otherwise it's marked Unspent. + async fn verify_spent_unconfirmed_outputs( + &self, + conn: &rusqlite::Connection, + target: &AccountSyncTarget, + unresolved: &[crate::db::UnresolvedSpentOutput], + ) -> Result<(), ScanError> { + use std::collections::HashMap; + + let hash_to_output: HashMap, &crate::db::UnresolvedSpentOutput> = + unresolved.iter().map(|o| (o.output_hash.clone(), o)).collect(); + + let all_output_hashes: Vec> = unresolved.iter().map(|o| o.output_hash.clone()).collect(); + + let mut marked_unspent = 0u64; + let mut confirmed_spent = 0u64; + + for output_hashes in all_output_hashes.chunks(50) { + let response = self + .client + .get_utxos_deleted_info(output_hashes.to_vec()) + .await + .map_err(|e| ScanError::Intermittent(format!("Failed to query UTXO deleted info: {}", e)))?; + + for utxo_info in &response.utxos { + let Some(output) = hash_to_output.get(&utxo_info.utxo_hash) else { + continue; + }; + + if let Some((spent_height, spent_block_hash)) = &utxo_info.spent_in_header { + // Create input record so the spend is tracked in the DB + let input_id = crate::db::insert_input( + conn, + target.account.id, + output.output_id, + *spent_height, + spent_block_hash, + 0, // timestamp not available from deleted info + ) + .map_err(ScanError::DbError)?; + + // Create debit balance change so credits - debits nets correctly + let change = crate::models::BalanceChange { + account_id: target.account.id, + caused_by_output_id: None, + caused_by_input_id: Some(input_id), + description: "Output spent (verified by base node)".to_string(), + balance_credit: 0.into(), + balance_debit: output.value.into(), + effective_date: chrono::Utc::now().naive_utc(), + effective_height: *spent_height, + claimed_recipient_address: None, + claimed_sender_address: None, + memo_hex: None, + memo_parsed: None, + claimed_fee: None, + claimed_amount: None, + is_reversal: false, + reversal_of_balance_change_id: None, + is_reversed: false, + }; + crate::db::insert_balance_change(conn, &change).map_err(ScanError::DbError)?; + + crate::db::update_output_status(conn, output.output_id, OutputStatus::Spent) + .map_err(ScanError::DbError)?; + confirmed_spent += 1; + } else if utxo_info.found_in_header.is_some() { + crate::db::update_output_status(conn, output.output_id, OutputStatus::Unspent) + .map_err(ScanError::DbError)?; + marked_unspent += 1; + } else { + debug!( + account = &*target.account.friendly_name, + output_hash = &*utxo_info.utxo_hash.to_hex(); + "Ouput unkown by base node, status kept as SpentUnconfirmed" + ); + } + } + } + + info!( + account_id = target.account.id, + marked_unspent = marked_unspent, + confirmed_spent = confirmed_spent; + "Verified SpentUnconfirmed outputs against base node" + ); + + Ok(()) + } + async fn create_reorg_scanner( &self, key_manager: KeyManager, diff --git a/minotari/src/scan/events.rs b/minotari/src/scan/events.rs index b5282f0..0b6ae48 100644 --- a/minotari/src/scan/events.rs +++ b/minotari/src/scan/events.rs @@ -150,6 +150,16 @@ pub enum ScanStatusEvent { last_scanned_height: u64, reason: PauseReason, }, + FastSyncPhaseStarted { + account_id: i64, + phase: FastSyncPhase, + from_height: u64, + to_height: Option, + }, + FastSyncPhaseCompleted { + account_id: i64, + phase: FastSyncPhase, + }, } #[derive(Debug, Clone)] @@ -158,6 +168,17 @@ pub enum PauseReason { Cancelled, } +/// Identifies which phase of a fast sync operation is active. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FastSyncPhase { + /// Phase 1: Fast scan from birthday to (tip - safety_buffer), unspent UTXOs only. + FastUtxoScan, + /// Phase 2: Full scan of recent blocks from (tip - safety_buffer) to tip. + RecentFullScan, + /// Phase 3: Full history backfill from birthday to (tip - safety_buffer). + HistoryBackfill, +} + #[derive(Debug, Clone)] pub struct ReorgDetectedEvent { pub account_id: i64, diff --git a/minotari/src/scan/mod.rs b/minotari/src/scan/mod.rs index be8d297..84705d6 100644 --- a/minotari/src/scan/mod.rs +++ b/minotari/src/scan/mod.rs @@ -97,6 +97,7 @@ mod coordinator; mod scanner_state_manager; pub use builder::Scanner; +pub use config::DEFAULT_FAST_SYNC_SAFETY_BUFFER; pub use config::ScanMode; pub use config::ScanRetryConfig; pub use config::ScanTimeoutConfig; diff --git a/minotari/src/scan/scanner_state_manager.rs b/minotari/src/scan/scanner_state_manager.rs index 3dc4d08..4f37788 100644 --- a/minotari/src/scan/scanner_state_manager.rs +++ b/minotari/src/scan/scanner_state_manager.rs @@ -18,6 +18,13 @@ impl ScannerStateManager { } } + #[allow(dead_code)] + pub fn reset(&mut self) { + self.scanner = None; + self.active_account_ids.clear(); + self.scan_config = ScanConfig::default(); + } + #[allow(clippy::too_many_arguments)] pub async fn get_scanner_and_config( &mut self, @@ -62,7 +69,9 @@ impl ScannerStateManager { // This ensures the scanner always starts at a valid, existing block. self.scan_config = ScanConfig::default() .with_start_height(new_start_height) - .with_batch_size(batch_size); + .with_batch_size(batch_size) + .with_exclude_spent(false) + .with_exclude_inputs(false); self.scan_config.end_height = new_end_height; let scanner = self