diff --git a/dash-spv-ffi/FFI_API.md b/dash-spv-ffi/FFI_API.md index 8f4606063..b958c3c6a 100644 --- a/dash-spv-ffi/FFI_API.md +++ b/dash-spv-ffi/FFI_API.md @@ -4,17 +4,15 @@ This document provides a comprehensive reference for all FFI (Foreign Function I **Auto-generated**: This documentation is automatically generated from the source code. Do not edit manually. -**Total Functions**: 49 +**Total Functions**: 39 ## Table of Contents - [Client Management](#client-management) - [Configuration](#configuration) - [Synchronization](#synchronization) -- [Wallet Operations](#wallet-operations) - [Transaction Management](#transaction-management) - [Platform Integration](#platform-integration) -- [Event Callbacks](#event-callbacks) - [Error Handling](#error-handling) - [Utility Functions](#utility-functions) @@ -55,25 +53,14 @@ Functions: 16 ### Synchronization -Functions: 5 +Functions: 3 | Function | Description | Module | |----------|-------------|--------| -| `dash_spv_ffi_client_clear_sync_event_callbacks` | Clear sync event callbacks | client | | `dash_spv_ffi_client_get_manager_sync_progress` | Get the current manager-based sync progress | client | | `dash_spv_ffi_client_get_sync_progress` | Get the current sync progress snapshot | client | -| `dash_spv_ffi_client_set_sync_event_callbacks` | Set sync event callbacks for push-based event notifications | client | | `dash_spv_ffi_sync_progress_destroy` | Destroy an `FFISyncProgress` object and all its nested pointers | types | -### Wallet Operations - -Functions: 2 - -| Function | Description | Module | -|----------|-------------|--------| -| `dash_spv_ffi_client_clear_wallet_event_callbacks` | Clear wallet event callbacks | client | -| `dash_spv_ffi_client_set_wallet_event_callbacks` | Set wallet event callbacks for push-based event notifications | client | - ### Transaction Management Functions: 1 @@ -91,19 +78,6 @@ Functions: 2 | `ffi_dash_spv_get_platform_activation_height` | Gets the platform activation height from the Core chain # Safety This... | platform_integration | | `ffi_dash_spv_get_quorum_public_key` | Gets a quorum public key from the Core chain # Safety This function is... | platform_integration | -### Event Callbacks - -Functions: 6 - -| Function | Description | Module | -|----------|-------------|--------| -| `dash_spv_ffi_client_clear_client_error_callback` | Clear the client error callback | client | -| `dash_spv_ffi_client_clear_network_event_callbacks` | Clear network event callbacks | client | -| `dash_spv_ffi_client_clear_progress_callback` | Clear progress callback | client | -| `dash_spv_ffi_client_set_client_error_callback` | Set a callback for fatal client errors (start failure, sync thread crash) | client | -| `dash_spv_ffi_client_set_network_event_callbacks` | Set network event callbacks for push-based event notifications | client | -| `dash_spv_ffi_client_set_progress_callback` | Set progress callback for sync progress updates | client | - ### Error Handling Functions: 1 @@ -426,22 +400,6 @@ dash_spv_ffi_config_testnet() -> *mut FFIClientConfig ### Synchronization - Detailed -#### `dash_spv_ffi_client_clear_sync_event_callbacks` - -```c -dash_spv_ffi_client_clear_sync_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear sync event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - #### `dash_spv_ffi_client_get_manager_sync_progress` ```c @@ -474,22 +432,6 @@ Get the current sync progress snapshot. # Safety - `client` must be a valid, no --- -#### `dash_spv_ffi_client_set_sync_event_callbacks` - -```c -dash_spv_ffi_client_set_sync_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFISyncEventCallbacks,) -> i32 -``` - -**Description:** -Set sync event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - #### `dash_spv_ffi_sync_progress_destroy` ```c @@ -506,40 +448,6 @@ Destroy an `FFISyncProgress` object and all its nested pointers. # Safety - `pr --- -### Wallet Operations - Detailed - -#### `dash_spv_ffi_client_clear_wallet_event_callbacks` - -```c -dash_spv_ffi_client_clear_wallet_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear wallet event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_wallet_event_callbacks` - -```c -dash_spv_ffi_client_set_wallet_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFIWalletEventCallbacks,) -> i32 -``` - -**Description:** -Set wallet event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - ### Transaction Management - Detailed #### `dash_spv_ffi_client_broadcast_transaction` @@ -592,104 +500,6 @@ This function is unsafe because: - The caller must ensure all pointers are valid --- -### Event Callbacks - Detailed - -#### `dash_spv_ffi_client_clear_client_error_callback` - -```c -dash_spv_ffi_client_clear_client_error_callback(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear the client error callback. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_clear_network_event_callbacks` - -```c -dash_spv_ffi_client_clear_network_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear network event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_clear_progress_callback` - -```c -dash_spv_ffi_client_clear_progress_callback(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear progress callback. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_client_error_callback` - -```c -dash_spv_ffi_client_set_client_error_callback(client: *mut FFIDashSpvClient, callback: FFIClientErrorCallback,) -> i32 -``` - -**Description:** -Set a callback for fatal client errors (start failure, sync thread crash). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background thread. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background thread. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_network_event_callbacks` - -```c -dash_spv_ffi_client_set_network_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFINetworkEventCallbacks,) -> i32 -``` - -**Description:** -Set network event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_progress_callback` - -```c -dash_spv_ffi_client_set_progress_callback(client: *mut FFIDashSpvClient, callback: crate::FFIProgressCallback,) -> i32 -``` - -**Description:** -Set progress callback for sync progress updates. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background task. - -**Module:** `client` - ---- - ### Error Handling - Detailed #### `dash_spv_ffi_get_last_error` @@ -787,14 +597,14 @@ The caller must ensure that: - The client pointer is valid - The returned pointe #### `dash_spv_ffi_client_run` ```c -dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) -> i32 +dash_spv_ffi_client_run(client: *mut FFIDashSpvClient, callbacks: FFIEventCallbacks,) -> i32 ``` **Description:** -Start the SPV client and begin syncing in the background. Subscribes to events, spawns monitoring threads, then spawns a background thread that calls `run()` (which handles start + sync loop + stop internally). Returns immediately after spawning. Use event callbacks (set via `set_sync_event_callbacks`, `set_network_event_callbacks`, `set_wallet_event_callbacks`) to receive notifications. Configure callbacks before calling `run()`. # Safety - `client` must be a valid, non-null pointer to a created client. # Returns 0 on success, error code on failure. +Start the SPV client and begin syncing in the background. Accepts all event callbacks as a single struct and passes them to the library's `run()` method, which manages channel subscriptions and monitoring tasks internally. Returns immediately after spawning. # Safety - `client` must be a valid, non-null pointer to a created client. - The `callbacks` struct and its `user_data` pointers must remain valid until `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. # Returns 0 on success, error code on failure. **Safety:** -- `client` must be a valid, non-null pointer to a created client. +- `client` must be a valid, non-null pointer to a created client. - The `callbacks` struct and its `user_data` pointers must remain valid until `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. **Module:** `client` diff --git a/dash-spv-ffi/src/bin/ffi_cli.rs b/dash-spv-ffi/src/bin/ffi_cli.rs index b9c4a62d1..0e8b68c8a 100644 --- a/dash-spv-ffi/src/bin/ffi_cli.rs +++ b/dash-spv-ffi/src/bin/ffi_cli.rs @@ -403,84 +403,50 @@ fn main() { dash_spv_ffi_wallet_manager_free(wallet_manager); } - // Set up event callbacks - let sync_callbacks = FFISyncEventCallbacks { - on_sync_start: Some(on_sync_start), - on_block_headers_stored: Some(on_block_headers_stored), - on_block_header_sync_complete: Some(on_block_header_sync_complete), - on_filter_headers_stored: Some(on_filter_headers_stored), - on_filter_headers_sync_complete: Some(on_filter_headers_sync_complete), - on_filters_stored: Some(on_filters_stored), - on_filters_sync_complete: Some(on_filters_sync_complete), - on_blocks_needed: Some(on_blocks_needed), - on_block_processed: Some(on_block_processed), - on_masternode_state_updated: Some(on_masternode_state_updated), - on_chainlock_received: Some(on_chainlock_received), - on_instantlock_received: Some(on_instantlock_received), - on_manager_error: Some(on_manager_error), - on_sync_complete: Some(on_sync_complete), - user_data: ptr::null_mut(), + // Build all event callbacks in a single struct + let callbacks = FFIEventCallbacks { + sync: FFISyncEventCallbacks { + on_sync_start: Some(on_sync_start), + on_block_headers_stored: Some(on_block_headers_stored), + on_block_header_sync_complete: Some(on_block_header_sync_complete), + on_filter_headers_stored: Some(on_filter_headers_stored), + on_filter_headers_sync_complete: Some(on_filter_headers_sync_complete), + on_filters_stored: Some(on_filters_stored), + on_filters_sync_complete: Some(on_filters_sync_complete), + on_blocks_needed: Some(on_blocks_needed), + on_block_processed: Some(on_block_processed), + on_masternode_state_updated: Some(on_masternode_state_updated), + on_chainlock_received: Some(on_chainlock_received), + on_instantlock_received: Some(on_instantlock_received), + on_manager_error: Some(on_manager_error), + on_sync_complete: Some(on_sync_complete), + user_data: ptr::null_mut(), + }, + network: FFINetworkEventCallbacks { + on_peer_connected: Some(on_peer_connected), + on_peer_disconnected: Some(on_peer_disconnected), + on_peers_updated: Some(on_peers_updated), + user_data: ptr::null_mut(), + }, + progress: FFIProgressCallback { + on_progress: Some(on_progress_update), + user_data: ptr::null_mut(), + }, + wallet: FFIWalletEventCallbacks { + on_transaction_received: Some(on_transaction_received), + on_balance_updated: Some(on_balance_updated), + user_data: ptr::null_mut(), + }, + error: FFIClientErrorCallback { + on_error: None, + user_data: ptr::null_mut(), + }, }; - let network_callbacks = FFINetworkEventCallbacks { - on_peer_connected: Some(on_peer_connected), - on_peer_disconnected: Some(on_peer_disconnected), - on_peers_updated: Some(on_peers_updated), - user_data: ptr::null_mut(), - }; - - let wallet_callbacks = FFIWalletEventCallbacks { - on_transaction_received: Some(on_transaction_received), - on_balance_updated: Some(on_balance_updated), - user_data: ptr::null_mut(), - }; - - let rc = dash_spv_ffi_client_set_sync_event_callbacks(client, sync_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set sync callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - let rc = dash_spv_ffi_client_set_network_event_callbacks(client, network_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set network callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - let rc = dash_spv_ffi_client_set_wallet_event_callbacks(client, wallet_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set wallet callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - // Set up progress callback - let progress_callback = FFIProgressCallback { - on_progress: Some(on_progress_update), - user_data: ptr::null_mut(), - }; - - let rc = dash_spv_ffi_client_set_progress_callback(client, progress_callback); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set progress callback: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - println!("Event and progress callbacks configured, starting sync..."); // Run client - starts sync in background and returns immediately - let rc = dash_spv_ffi_client_run(client); + let rc = dash_spv_ffi_client_run(client, callbacks); if rc != FFIErrorCode::Success as i32 { eprintln!("Client run failed: {}", ffi_string_to_rust(dash_spv_ffi_get_last_error())); std::process::exit(1); diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 90012ee9b..a6ba44e3e 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -7,6 +7,9 @@ //! - `FFIWalletEventCallbacks` - Wallet manager events use crate::{dash_spv_ffi_sync_progress_destroy, FFISyncProgress}; +use dash_spv::network::NetworkEvent; +use dash_spv::sync::{SyncEvent, SyncProgress}; +use dash_spv::EventHandler; use dashcore::hashes::Hash; use key_wallet::manager::WalletEvent; use std::ffi::CString; @@ -618,6 +621,78 @@ impl FFIClientErrorCallback { } } +// ============================================================================ +// FFIEventCallbacks - All callbacks in a single C-compatible struct +// ============================================================================ + +/// All event callbacks grouped into a single struct. +/// +/// Pass this to `dash_spv_ffi_client_run` instead of calling individual +/// `set_*_callbacks()` functions. Any callback group left at its default +/// (all function pointers null) will simply not receive events. +#[repr(C)] +#[derive(Clone, Default)] +pub struct FFIEventCallbacks { + pub sync: FFISyncEventCallbacks, + pub network: FFINetworkEventCallbacks, + pub progress: FFIProgressCallback, + pub wallet: FFIWalletEventCallbacks, + pub error: FFIClientErrorCallback, +} + +unsafe impl Send for FFIEventCallbacks {} +unsafe impl Sync for FFIEventCallbacks {} + +// ============================================================================ +// FFIEventHandler - Implements EventHandler trait for FFI dispatch +// ============================================================================ + +/// Bridges the library's `EventHandler` trait to FFI C callbacks. +/// +/// Holds the individual callback structs and delegates each event type +/// to the appropriate FFI dispatch method. +pub(crate) struct FFIEventHandler { + sync: FFISyncEventCallbacks, + network: FFINetworkEventCallbacks, + progress: FFIProgressCallback, + wallet: FFIWalletEventCallbacks, + error: FFIClientErrorCallback, +} + +impl From for FFIEventHandler { + fn from(cbs: FFIEventCallbacks) -> Self { + Self { + sync: cbs.sync, + network: cbs.network, + progress: cbs.progress, + wallet: cbs.wallet, + error: cbs.error, + } + } +} + +impl EventHandler for FFIEventHandler { + fn on_sync_event(&self, event: &SyncEvent) { + self.sync.dispatch(event); + } + + fn on_network_event(&self, event: &NetworkEvent) { + self.network.dispatch(event); + } + + fn on_progress(&self, progress: &SyncProgress) { + self.progress.dispatch(progress); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + self.wallet.dispatch(event); + } + + fn on_error(&self, error: &str) { + self.error.dispatch(error); + } +} + impl FFIWalletEventCallbacks { /// Dispatch a WalletEvent to the appropriate callback. pub fn dispatch(&self, event: &WalletEvent) { diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index 41ef3a069..038a2f115 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -1,7 +1,6 @@ use crate::{ - null_check, set_last_error, FFIClientConfig, FFIClientErrorCallback, FFIErrorCode, - FFINetworkEventCallbacks, FFIProgressCallback, FFISyncEventCallbacks, FFISyncProgress, - FFIWalletEventCallbacks, FFIWalletManager, + callbacks::FFIEventHandler, null_check, set_last_error, FFIClientConfig, FFIErrorCode, + FFIEventCallbacks, FFISyncProgress, FFIWalletManager, }; // Import wallet types from key-wallet-ffi use key_wallet_ffi::FFIWalletManager as KeyWalletFFIWalletManager; @@ -10,100 +9,12 @@ use dash_spv::storage::DiskStorageManager; use dash_spv::DashSpvClient; use tracing::dispatcher::{get_default, set_default}; -use std::mem::{forget, take}; +use std::mem::forget; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -/// Spawns a tokio task that monitors a broadcast channel and dispatches events to callbacks. -fn spawn_broadcast_monitor( - name: &'static str, - receiver: broadcast::Receiver, - callbacks: Arc>>, - shutdown: CancellationToken, - rt: &Runtime, - dispatch_fn: F, -) -> JoinHandle<()> -where - E: Clone + Send + 'static, - C: Clone + Send + 'static, - F: Fn(&C, &E) + Send + 'static, -{ - let mut receiver = receiver; - rt.spawn(async move { - tracing::debug!("{} monitoring task started", name); - loop { - tokio::select! { - result = receiver.recv() => { - match result { - Ok(event) => { - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &event); - } - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => continue, - } - } - _ = shutdown.cancelled() => break, - } - } - tracing::debug!("{} monitoring task exiting", name); - }) -} - -/// Spawns a tokio task that monitors a watch channel for progress updates. -/// -/// Sends the initial progress value, then monitors for changes. -fn spawn_progress_monitor( - receiver: watch::Receiver

, - callbacks: Arc>>, - shutdown: CancellationToken, - rt: &Runtime, - dispatch_fn: F, -) -> JoinHandle<()> -where - P: Clone + Send + Sync + 'static, - C: Clone + Send + 'static, - F: Fn(&C, &P) + Send + 'static, -{ - let mut receiver = receiver; - rt.spawn(async move { - tracing::debug!("Progress monitoring task started"); - - // Send initial progress - { - let progress = receiver.borrow_and_update().clone(); - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &progress); - } - } - - loop { - tokio::select! { - result = receiver.changed() => { - match result { - Ok(()) => { - let progress = receiver.borrow_and_update().clone(); - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &progress); - } - } - Err(_) => break, - } - } - _ = shutdown.cancelled() => break, - } - } - tracing::debug!("Progress monitoring task exiting"); - }) -} - /// FFI wrapper around `DashSpvClient`. type InnerClient = DashSpvClient< key_wallet::manager::WalletManager, @@ -114,13 +25,8 @@ type InnerClient = DashSpvClient< pub struct FFIDashSpvClient { pub(crate) inner: InnerClient, pub(crate) runtime: Arc, - active_tasks: Mutex>>, + run_task: Mutex>>, shutdown_token: CancellationToken, - sync_event_callbacks: Arc>>, - network_event_callbacks: Arc>>, - wallet_event_callbacks: Arc>>, - progress_callback: Arc>>, - client_error_callback: Arc>>, } /// Create a new SPV client and return an opaque pointer. @@ -183,13 +89,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( let ffi_client = FFIDashSpvClient { inner: client, runtime, - active_tasks: Mutex::new(Vec::new()), + run_task: Mutex::new(None), shutdown_token: CancellationToken::new(), - sync_event_callbacks: Arc::new(Mutex::new(None)), - network_event_callbacks: Arc::new(Mutex::new(None)), - wallet_event_callbacks: Arc::new(Mutex::new(None)), - progress_callback: Arc::new(Mutex::new(None)), - client_error_callback: Arc::new(Mutex::new(None)), }; Box::into_raw(Box::new(ffi_client)) } @@ -201,30 +102,22 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( } impl FFIDashSpvClient { - /// Abort all active monitoring tasks and wait for them to finish. - fn cancel_active_tasks(&self) { - let tasks = { - let mut guard = self.active_tasks.lock().unwrap(); - take(&mut *guard) - }; - - for task in &tasks { + /// Cancel the run task and wait for it to finish. + fn cancel_run_task(&self) { + let task = self.run_task.lock().unwrap().take(); + if let Some(task) = task { task.abort(); - } - - // Wait for all tasks to finish - self.runtime.block_on(async { - for task in tasks { + self.runtime.block_on(async { let _ = task.await; - } - }); + }); + } } } fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> { client.shutdown_token.cancel(); - client.cancel_active_tasks(); + client.cancel_run_task(); let result = client.runtime.block_on(async { client.inner.stop().await }); @@ -281,106 +174,51 @@ pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) /// Start the SPV client and begin syncing in the background. /// -/// Subscribes to events, spawns monitoring threads, then spawns a background -/// thread that calls `run()` (which handles start + sync loop + stop internally). -/// Returns immediately after spawning. -/// -/// Use event callbacks (set via `set_sync_event_callbacks`, -/// `set_network_event_callbacks`, `set_wallet_event_callbacks`) to receive -/// notifications. Configure callbacks before calling `run()`. +/// Accepts all event callbacks as a single struct and passes them to the +/// library's `run()` method, which manages channel subscriptions and +/// monitoring tasks internally. Returns immediately after spawning. /// /// # Safety /// - `client` must be a valid, non-null pointer to a created client. +/// - The `callbacks` struct and its `user_data` pointers must remain valid +/// until `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. /// /// # Returns /// 0 on success, error code on failure. #[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) -> i32 { +pub unsafe extern "C" fn dash_spv_ffi_client_run( + client: *mut FFIDashSpvClient, + callbacks: FFIEventCallbacks, +) -> i32 { null_check!(client); let client = &(*client); - tracing::info!("dash_spv_ffi_client_run: setting up event monitoring"); + tracing::info!("dash_spv_ffi_client_run: setting up event handler"); - let shutdown_token = client.shutdown_token.clone(); - - // Subscribe to events before spawning tasks - let (sync_event_rx, network_event_rx, progress_rx, wallet_event_rx) = - client.runtime.block_on(async { - let wallet_rx = client.inner.wallet().read().await.subscribe_events(); - ( - client.inner.subscribe_sync_events().await, - client.inner.subscribe_network_events().await, - client.inner.subscribe_progress().await, - wallet_rx, - ) - }); - - // Spawn event monitoring tasks for each callback type that is set - let mut tasks = client.active_tasks.lock().unwrap(); - - if client.sync_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Sync event", - sync_event_rx.resubscribe(), - client.sync_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); - } - - if client.network_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Network event", - network_event_rx.resubscribe(), - client.network_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); - } - - if client.progress_callback.lock().unwrap().is_some() { - tasks.push(spawn_progress_monitor( - progress_rx.clone(), - client.progress_callback.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, progress| cb.dispatch(progress), - )); - } - - if client.wallet_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Wallet event", - wallet_event_rx.resubscribe(), - client.wallet_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); + // Dispatch initial progress before spawning the background task + if callbacks.progress.on_progress.is_some() { + let progress = client.runtime.block_on(async { client.inner.progress().await }); + callbacks.progress.dispatch(&progress); } - let error_callback = client.client_error_callback.clone(); + let handler = Arc::new(FFIEventHandler::from(callbacks)); + let shutdown_token = client.shutdown_token.clone(); let spv_client = client.inner.clone(); - tasks.push(client.runtime.spawn(async move { + + let task = client.runtime.spawn(async move { tracing::debug!("Sync task: starting run"); - if let Err(e) = spv_client.run(shutdown_token).await { + if let Err(e) = spv_client.run(shutdown_token, handler).await { tracing::error!("Sync task: error: {}", e); - let cb = error_callback.lock().unwrap().clone(); - if let Some(ref cb) = cb { - cb.dispatch(&e.to_string()); - } } tracing::debug!("Sync task: exiting"); - })); + }); - drop(tasks); + *client.run_task.lock().unwrap() = Some(task); - tracing::info!("dash_spv_ffi_client_run: background tasks spawned, returning"); + tracing::info!("dash_spv_ffi_client_run: background task spawned, returning"); FFIErrorCode::Success as i32 } @@ -508,10 +346,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie let _ = client.inner.stop().await; }); - // Abort and await all active tasks - client.cancel_active_tasks(); + // Abort and await the run task + client.cancel_run_task(); - tracing::info!("✅ FFI client destroyed and all tasks cleaned up"); + tracing::info!("FFI client destroyed and all tasks cleaned up"); } } @@ -568,199 +406,3 @@ pub unsafe extern "C" fn dash_spv_ffi_wallet_manager_free(manager: *mut FFIWalle key_wallet_ffi::wallet_manager::wallet_manager_free(manager as *mut KeyWalletFFIWalletManager); } - -// ============================================================================ -// Event Callback Functions -// ============================================================================ - -/// Set sync event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_sync_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFISyncEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.sync_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear sync event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_sync_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.sync_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set network event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_network_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFINetworkEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.network_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear network event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_network_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.network_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set wallet event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_wallet_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFIWalletEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.wallet_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear wallet event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_wallet_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.wallet_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set progress callback for sync progress updates. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callback` struct and its `user_data` must remain valid until the callback is cleared. -/// - The callback must be thread-safe as it may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_progress_callback( - client: *mut FFIDashSpvClient, - callback: crate::FFIProgressCallback, -) -> i32 { - null_check!(client); - - let client = &(*client); - let progress = client.runtime.block_on(async { client.inner.progress().await }); - let mut cb_guard = client.progress_callback.lock().unwrap(); - *cb_guard = Some(callback); - if let Some(ref cb) = *cb_guard { - cb.dispatch(&progress); - } - - FFIErrorCode::Success as i32 -} - -/// Clear progress callback. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_progress_callback( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.progress_callback.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set a callback for fatal client errors (start failure, sync thread crash). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callback` struct and its `user_data` must remain valid until the callback is cleared. -/// - The callback must be thread-safe as it may be called from a background thread. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_client_error_callback( - client: *mut FFIDashSpvClient, - callback: FFIClientErrorCallback, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.client_error_callback.lock().unwrap() = Some(callback); - - FFIErrorCode::Success as i32 -} - -/// Clear the client error callback. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_client_error_callback( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.client_error_callback.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} diff --git a/dash-spv-ffi/tests/dashd_sync/context.rs b/dash-spv-ffi/tests/dashd_sync/context.rs index 4e5ae7210..82bcfc9c8 100644 --- a/dash-spv-ffi/tests/dashd_sync/context.rs +++ b/dash-spv-ffi/tests/dashd_sync/context.rs @@ -11,9 +11,8 @@ use dash_spv::logging::{LogFileConfig, LoggingConfig, LoggingGuard}; use dash_spv::test_utils::{retain_test_dir, SYNC_TIMEOUT}; use dash_spv_ffi::client::{ dash_spv_ffi_client_destroy, dash_spv_ffi_client_get_wallet_manager, dash_spv_ffi_client_new, - dash_spv_ffi_client_run, dash_spv_ffi_client_set_network_event_callbacks, - dash_spv_ffi_client_set_sync_event_callbacks, dash_spv_ffi_client_set_wallet_event_callbacks, - dash_spv_ffi_client_stop, dash_spv_ffi_wallet_manager_free, FFIDashSpvClient, + dash_spv_ffi_client_run, dash_spv_ffi_client_stop, dash_spv_ffi_wallet_manager_free, + FFIDashSpvClient, }; use dash_spv_ffi::config::{ dash_spv_ffi_config_add_peer, dash_spv_ffi_config_destroy, dash_spv_ffi_config_new, @@ -21,6 +20,7 @@ use dash_spv_ffi::config::{ dash_spv_ffi_config_set_restrict_to_configured_peers, FFIClientConfig, }; use dash_spv_ffi::types::FFIWalletManager as FFIWalletManagerOpaque; +use dash_spv_ffi::FFIEventCallbacks; use dashcore::hashes::Hash; use dashcore::{Address, Txid}; use key_wallet_ffi::managed_account::{ @@ -220,13 +220,13 @@ impl FFITestContext { /// /// Calls FFI client functions through raw pointers held by the context. pub(super) unsafe fn run_with_sync_callbacks(&self) { - let sync_callbacks = create_sync_callbacks(&self.session.tracker); - let result = - dash_spv_ffi_client_set_sync_event_callbacks(self.session.client, sync_callbacks); - assert_eq!(result, 0, "Failed to set sync event callbacks"); + let callbacks = FFIEventCallbacks { + sync: create_sync_callbacks(&self.session.tracker), + ..FFIEventCallbacks::default() + }; self.snapshot_sync_baseline(); - let result = dash_spv_ffi_client_run(self.session.client); + let result = dash_spv_ffi_client_run(self.session.client, callbacks); assert_eq!(result, 0, "Failed to run FFI client"); } @@ -236,21 +236,15 @@ impl FFITestContext { /// /// Calls FFI client functions through raw pointers held by the context. pub(super) unsafe fn run_with_all_callbacks(&self) { - let sync_cbs = create_sync_callbacks(&self.session.tracker); - let network_cbs = create_network_callbacks(&self.session.tracker); - let wallet_cbs = create_wallet_callbacks(&self.session.tracker); - - let result = dash_spv_ffi_client_set_sync_event_callbacks(self.session.client, sync_cbs); - assert_eq!(result, 0, "Failed to set sync event callbacks"); - let result = - dash_spv_ffi_client_set_network_event_callbacks(self.session.client, network_cbs); - assert_eq!(result, 0, "Failed to set network event callbacks"); - let result = - dash_spv_ffi_client_set_wallet_event_callbacks(self.session.client, wallet_cbs); - assert_eq!(result, 0, "Failed to set wallet event callbacks"); + let callbacks = FFIEventCallbacks { + sync: create_sync_callbacks(&self.session.tracker), + network: create_network_callbacks(&self.session.tracker), + wallet: create_wallet_callbacks(&self.session.tracker), + ..FFIEventCallbacks::default() + }; self.snapshot_sync_baseline(); - let result = dash_spv_ffi_client_run(self.session.client); + let result = dash_spv_ffi_client_run(self.session.client, callbacks); assert_eq!(result, 0, "Failed to run FFI client"); } diff --git a/dash-spv-ffi/tests/test_client.rs b/dash-spv-ffi/tests/test_client.rs index b1001dffe..613381608 100644 --- a/dash-spv-ffi/tests/test_client.rs +++ b/dash-spv-ffi/tests/test_client.rs @@ -4,24 +4,8 @@ mod tests { use key_wallet_ffi::FFINetwork; use serial_test::serial; use std::ffi::CString; - use std::os::raw::c_void; - use std::sync::Mutex; use tempfile::TempDir; - struct ProgressCallbackData { - state: Mutex>, - is_synced: Mutex>, - } - - impl ProgressCallbackData { - fn new() -> Self { - Self { - state: Mutex::new(None), - is_synced: Mutex::new(None), - } - } - } - fn create_test_config() -> (*mut FFIClientConfig, TempDir) { let temp_dir = TempDir::new().unwrap(); let config = dash_spv_ffi_config_new(FFINetwork::Regtest); @@ -64,8 +48,8 @@ mod tests { let (config, _temp_dir) = create_test_config(); let client = dash_spv_ffi_client_new(config); - // Note: Start/stop may fail in test environment without network - let _result = dash_spv_ffi_client_run(client); + // Pass default (no-op) callbacks — start/stop may fail without network + let _result = dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); let _result = dash_spv_ffi_client_stop(client); dash_spv_ffi_client_destroy(client); @@ -77,7 +61,8 @@ mod tests { #[serial] fn test_client_null_checks() { unsafe { - let result = dash_spv_ffi_client_run(std::ptr::null_mut()); + let result = + dash_spv_ffi_client_run(std::ptr::null_mut(), FFIEventCallbacks::default()); assert_eq!(result, FFIErrorCode::NullPointer as i32); let result = dash_spv_ffi_client_stop(std::ptr::null_mut()); @@ -87,48 +72,4 @@ mod tests { assert!(progress.is_null()); } } - - extern "C" fn test_progress_callback(progress: *const FFISyncProgress, user_data: *mut c_void) { - assert!(!progress.is_null()); - let data = unsafe { &*(user_data as *const ProgressCallbackData) }; - let p = unsafe { &*progress }; - - *data.state.lock().unwrap() = Some(p.state); - *data.is_synced.lock().unwrap() = Some(p.is_synced); - } - - #[test] - #[serial] - fn test_set_progress_callback_emits_progress() { - unsafe { - let (config, _temp_dir) = create_test_config(); - let client = dash_spv_ffi_client_new(config); - assert!(!client.is_null()); - - let callback_data = Box::new(ProgressCallbackData::new()); - let data_ptr = &*callback_data as *const ProgressCallbackData as *mut c_void; - - let progress_callback = FFIProgressCallback { - on_progress: Some(test_progress_callback), - user_data: data_ptr, - }; - - let result = dash_spv_ffi_client_set_progress_callback(client, progress_callback); - assert_eq!(result, FFIErrorCode::Success as i32); - - // Verify callback was invoked with expected initial values - assert_eq!( - callback_data.state.lock().unwrap().unwrap(), - FFISyncState::WaitingForConnections, - "initial state should be WaitingForConnections" - ); - assert!( - !callback_data.is_synced.lock().unwrap().unwrap(), - "initial is_synced should be false" - ); - - dash_spv_ffi_client_destroy(client); - dash_spv_ffi_config_destroy(config); - } - } } diff --git a/dash-spv-ffi/tests/unit/test_async_operations.rs b/dash-spv-ffi/tests/unit/test_async_operations.rs index cefeb27ed..eb1c0719d 100644 --- a/dash-spv-ffi/tests/unit/test_async_operations.rs +++ b/dash-spv-ffi/tests/unit/test_async_operations.rs @@ -113,8 +113,8 @@ mod tests { println!("Testing callback thread safety with concurrent invocations"); - // Start the client - let start_result = dash_spv_ffi_client_run(client); + // Start the client with default (empty) callbacks + let start_result = dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); assert_eq!(start_result, 0); thread::sleep(Duration::from_millis(100)); @@ -292,8 +292,17 @@ mod tests { user_data: &event_data as *const _ as *mut c_void, }; - let result = dash_spv_ffi_client_set_sync_event_callbacks(client, sync_callbacks); - assert_eq!(result, FFIErrorCode::Success as i32); + // Build an FFIEventCallbacks with sync callbacks set + let callbacks = FFIEventCallbacks { + sync: sync_callbacks, + ..FFIEventCallbacks::default() + }; + + // Verify the struct is properly constructed (callbacks are now + // passed directly to run(), no separate set call needed) + assert!(callbacks.sync.on_sync_start.is_some()); + assert!(callbacks.sync.on_block_headers_stored.is_some()); + assert!(callbacks.sync.on_sync_complete.is_some()); dash_spv_ffi_client_destroy(client); dash_spv_ffi_config_destroy(config); diff --git a/dash-spv-ffi/tests/unit/test_client_lifecycle.rs b/dash-spv-ffi/tests/unit/test_client_lifecycle.rs index ceef717ec..d463ac21d 100644 --- a/dash-spv-ffi/tests/unit/test_client_lifecycle.rs +++ b/dash-spv-ffi/tests/unit/test_client_lifecycle.rs @@ -74,14 +74,14 @@ mod tests { assert!(!client.is_null()); // Start - let _result = dash_spv_ffi_client_run(client); + let _result = dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); // May fail in test environment, but should handle gracefully // Stop let _result = dash_spv_ffi_client_stop(client); // Restart - let _result = dash_spv_ffi_client_run(client); + let _result = dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); let _result = dash_spv_ffi_client_stop(client); dash_spv_ffi_client_destroy(client); @@ -100,7 +100,7 @@ mod tests { // Start a sync operation in background // Start sync (non-blocking) - dash_spv_ffi_client_run(client); + dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); // Immediately destroy client (should handle pending operations) dash_spv_ffi_client_destroy(client); @@ -123,7 +123,7 @@ mod tests { assert!(!client.is_null()); // Try to start (should handle no peers gracefully) - let _result = dash_spv_ffi_client_run(client); + let _result = dash_spv_ffi_client_run(client, FFIEventCallbacks::default()); dash_spv_ffi_client_destroy(client); dash_spv_ffi_config_destroy(config); @@ -166,7 +166,7 @@ mod tests { unsafe { // Test all client operations with null assert_eq!( - dash_spv_ffi_client_run(std::ptr::null_mut()), + dash_spv_ffi_client_run(std::ptr::null_mut(), FFIEventCallbacks::default()), FFIErrorCode::NullPointer as i32 ); @@ -231,22 +231,23 @@ mod tests { let client = dash_spv_ffi_client_new(config); assert!(!client.is_null()); - let callback = FFIClientErrorCallback { - on_error: Some(on_error), - user_data: tx_ptr as *mut std::os::raw::c_void, + let callbacks = FFIEventCallbacks { + error: FFIClientErrorCallback { + on_error: Some(on_error), + user_data: tx_ptr as *mut std::os::raw::c_void, + }, + ..FFIEventCallbacks::default() }; - let result = dash_spv_ffi_client_set_client_error_callback(client, callback); - assert_eq!(result, FFIErrorCode::Success as i32); // Call run() twice — the second run's sync thread will call // start() on the already-running client, triggering "already running" - let run_result = dash_spv_ffi_client_run(client); + let run_result = dash_spv_ffi_client_run(client, callbacks.clone()); assert_eq!(run_result, FFIErrorCode::Success as i32); // Brief wait for the first run's sync thread to complete start() thread::sleep(Duration::from_millis(200)); - let _run_result2 = dash_spv_ffi_client_run(client); + let _run_result2 = dash_spv_ffi_client_run(client, callbacks); // Wait for the error callback to fire (with timeout) let error_msg = rx @@ -302,20 +303,10 @@ mod tests { #[test] #[serial] - fn test_client_error_callback_null_client() { + fn test_client_run_null_client() { unsafe { - let callback = FFIClientErrorCallback { - on_error: None, - user_data: std::ptr::null_mut(), - }; - - assert_eq!( - dash_spv_ffi_client_set_client_error_callback(std::ptr::null_mut(), callback), - FFIErrorCode::NullPointer as i32 - ); - assert_eq!( - dash_spv_ffi_client_clear_client_error_callback(std::ptr::null_mut()), + dash_spv_ffi_client_run(std::ptr::null_mut(), FFIEventCallbacks::default()), FFIErrorCode::NullPointer as i32 ); } diff --git a/dash-spv/examples/filter_sync.rs b/dash-spv/examples/filter_sync.rs index 955400f30..d0927fe17 100644 --- a/dash-spv/examples/filter_sync.rs +++ b/dash-spv/examples/filter_sync.rs @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box> { let shutdown_token = CancellationToken::new(); - client.run(shutdown_token).await?; + client.run(shutdown_token, Arc::new(())).await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/simple_sync.rs b/dash-spv/examples/simple_sync.rs index 8fd1cbbbc..7393ed054 100644 --- a/dash-spv/examples/simple_sync.rs +++ b/dash-spv/examples/simple_sync.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { let shutdown_token = CancellationToken::new(); - client.run(shutdown_token).await?; + client.run(shutdown_token, Arc::new(())).await?; println!("Done!"); Ok(()) diff --git a/dash-spv/examples/spv_with_wallet.rs b/dash-spv/examples/spv_with_wallet.rs index 31e6d4ffe..3c70c4b73 100644 --- a/dash-spv/examples/spv_with_wallet.rs +++ b/dash-spv/examples/spv_with_wallet.rs @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { let shutdown_token = CancellationToken::new(); - client.run(shutdown_token).await?; + client.run(shutdown_token, Arc::new(())).await?; println!("Done!"); Ok(()) diff --git a/dash-spv/src/client/event_handler.rs b/dash-spv/src/client/event_handler.rs new file mode 100644 index 000000000..e3d7b9c4c --- /dev/null +++ b/dash-spv/src/client/event_handler.rs @@ -0,0 +1,382 @@ +//! Event handler trait for receiving SPV client events. +//! +//! Provides `EventHandler`, a trait with default no-op implementations that +//! consumers override to receive push-based event notifications. The monitoring +//! infrastructure subscribes to internal channels and dispatches to the handler. + +use std::sync::Arc; + +use tokio::sync::{broadcast, watch}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::network::NetworkEvent; +use crate::sync::{SyncEvent, SyncProgress}; +use key_wallet::manager::WalletEvent; + +/// Trait for receiving SPV client events. +/// +/// All methods have default no-op implementations, so consumers only +/// need to override the events they care about. +pub trait EventHandler: Send + Sync + 'static { + fn on_sync_event(&self, _event: &SyncEvent) {} + fn on_network_event(&self, _event: &NetworkEvent) {} + fn on_progress(&self, _progress: &SyncProgress) {} + fn on_wallet_event(&self, _event: &WalletEvent) {} + fn on_error(&self, _error: &str) {} +} + +/// No-op implementation for consumers that don't need event notifications. +impl EventHandler for () {} + +/// Spawns a task that monitors a broadcast channel and dispatches events to the handler. +/// +/// `on_failure` is cancelled if the receiver lags, signalling the run loop to stop. +pub(crate) fn spawn_broadcast_monitor( + name: &'static str, + mut receiver: broadcast::Receiver, + handler: Arc, + shutdown: CancellationToken, + on_failure: CancellationToken, + dispatch_fn: F, +) -> JoinHandle<()> +where + E: Clone + Send + 'static, + H: EventHandler, + F: Fn(&H, &E) + Send + 'static, +{ + tokio::spawn(async move { + tracing::debug!("{} monitoring task started", name); + loop { + tokio::select! { + result = receiver.recv() => { + match result { + Ok(event) => dispatch_fn(&handler, &event), + Err(broadcast::error::RecvError::Closed) if shutdown.is_cancelled() => break, + Err(broadcast::error::RecvError::Closed) => { + let msg = format!("{} monitor channel closed unexpectedly", name); + tracing::error!("{}", msg); + handler.on_error(&msg); + on_failure.cancel(); + break; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + let msg = format!("{} monitor lagged, missed {} events", name, n); + tracing::error!("{}", msg); + handler.on_error(&msg); + on_failure.cancel(); + break; + } + } + } + _ = shutdown.cancelled() => break, + } + } + tracing::debug!("{} monitoring task exiting", name); + }) +} + +/// Spawns a task that monitors a watch channel for progress updates. +/// +/// Sends the initial progress value, then monitors for changes. +pub(crate) fn spawn_progress_monitor( + mut receiver: watch::Receiver, + handler: Arc, + shutdown: CancellationToken, + on_failure: CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + tracing::debug!("Progress monitoring task started"); + + handler.on_progress(&receiver.borrow_and_update()); + + loop { + tokio::select! { + result = receiver.changed() => { + match result { + Ok(()) => handler.on_progress(&receiver.borrow_and_update()), + Err(_) if shutdown.is_cancelled() => break, + Err(_) => { + let msg = "Progress monitor channel closed unexpectedly"; + tracing::error!("{}", msg); + handler.on_error(msg); + on_failure.cancel(); + break; + } + } + } + _ = shutdown.cancelled() => break, + } + } + tracing::debug!("Progress monitoring task exiting"); + }) +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use tokio::sync::{broadcast, watch}; + use tokio_util::sync::CancellationToken; + + use super::{spawn_broadcast_monitor, spawn_progress_monitor, EventHandler}; + use crate::network::NetworkEvent; + use crate::sync::{ManagerIdentifier, SyncEvent, SyncProgress}; + use key_wallet::manager::WalletEvent; + + struct RecordingHandler { + sync_count: AtomicUsize, + network_count: AtomicUsize, + progress_count: AtomicUsize, + wallet_count: AtomicUsize, + error_count: AtomicUsize, + } + + impl RecordingHandler { + fn new() -> Self { + Self { + sync_count: AtomicUsize::new(0), + network_count: AtomicUsize::new(0), + progress_count: AtomicUsize::new(0), + wallet_count: AtomicUsize::new(0), + error_count: AtomicUsize::new(0), + } + } + } + + impl EventHandler for RecordingHandler { + fn on_sync_event(&self, _event: &SyncEvent) { + self.sync_count.fetch_add(1, Ordering::SeqCst); + } + fn on_network_event(&self, _event: &NetworkEvent) { + self.network_count.fetch_add(1, Ordering::SeqCst); + } + fn on_progress(&self, _progress: &SyncProgress) { + self.progress_count.fetch_add(1, Ordering::SeqCst); + } + fn on_wallet_event(&self, _event: &WalletEvent) { + self.wallet_count.fetch_add(1, Ordering::SeqCst); + } + fn on_error(&self, _error: &str) { + self.error_count.fetch_add(1, Ordering::SeqCst); + } + } + + #[tokio::test] + async fn noop_handler_does_not_panic() { + let handler: () = (); + let event = SyncEvent::BlockHeadersStored { + tip_height: 100, + }; + handler.on_sync_event(&event); + handler.on_network_event(&NetworkEvent::PeersUpdated { + connected_count: 0, + addresses: vec![], + best_height: None, + }); + handler.on_progress(&SyncProgress::default()); + handler.on_error("test error"); + } + + #[tokio::test] + async fn broadcast_monitor_dispatches_events() { + let (tx, rx) = broadcast::channel(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + CancellationToken::new(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 1, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 2, + }) + .unwrap(); + tx.send(SyncEvent::SyncStart { + identifier: ManagerIdentifier::BlockHeader, + }) + .unwrap(); + + // Give the task time to process + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.sync_count.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn broadcast_monitor_exits_on_shutdown() { + let (_tx, rx) = broadcast::channel::(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + CancellationToken::new(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.sync_count.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn broadcast_monitor_fails_on_unexpected_channel_close() { + let (tx, rx) = broadcast::channel::(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + let on_failure = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + on_failure.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + // Drop sender without cancelling shutdown — this is unexpected + drop(tx); + task.await.unwrap(); + + assert_eq!(handler.error_count.load(Ordering::SeqCst), 1); + assert!(on_failure.is_cancelled()); + } + + #[tokio::test] + async fn broadcast_monitor_exits_on_lagged_receiver() { + let (tx, rx) = broadcast::channel(2); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + let on_failure = CancellationToken::new(); + + // Send more messages than the buffer can hold before spawning the monitor + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 1, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 2, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 3, + }) + .unwrap(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + on_failure.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + // The monitor should exit on its own due to the lagged error + task.await.unwrap(); + + // No sync events should have been dispatched, but on_error must have fired + assert_eq!(handler.sync_count.load(Ordering::SeqCst), 0); + assert_eq!(handler.error_count.load(Ordering::SeqCst), 1); + assert!(on_failure.is_cancelled()); + } + + #[tokio::test] + async fn progress_monitor_sends_initial_and_updates() { + let (tx, rx) = watch::channel(SyncProgress::default()); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = + spawn_progress_monitor(rx, handler.clone(), shutdown.clone(), CancellationToken::new()); + + // Give the task time to send initial progress + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Send two updates + tx.send_modify(|_| {}); + tx.send_modify(|_| {}); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + // 1 initial + at least 1 update (watch coalesces rapid updates) + assert!(handler.progress_count.load(Ordering::SeqCst) >= 2); + } + + #[tokio::test] + async fn progress_monitor_fails_on_unexpected_sender_drop() { + let (tx, rx) = watch::channel(SyncProgress::default()); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + let on_failure = CancellationToken::new(); + + let task = + spawn_progress_monitor(rx, handler.clone(), shutdown.clone(), on_failure.clone()); + + // Give it time to send initial + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Drop sender without cancelling shutdown — this is unexpected + drop(tx); + task.await.unwrap(); + + // At least the initial progress was sent, plus on_error fired + assert!(handler.progress_count.load(Ordering::SeqCst) >= 1); + assert_eq!(handler.error_count.load(Ordering::SeqCst), 1); + assert!(on_failure.is_cancelled()); + } + + #[tokio::test] + async fn network_event_dispatch() { + let (tx, rx) = broadcast::channel(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "network", + rx, + handler.clone(), + shutdown.clone(), + CancellationToken::new(), + |h: &RecordingHandler, event: &NetworkEvent| h.on_network_event(event), + ); + + let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap(); + tx.send(NetworkEvent::PeerConnected { + address: addr, + }) + .unwrap(); + tx.send(NetworkEvent::PeerDisconnected { + address: addr, + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.network_count.load(Ordering::SeqCst), 2); + } +} diff --git a/dash-spv/src/client/events.rs b/dash-spv/src/client/events.rs index 7f5388afd..c07cde4fd 100644 --- a/dash-spv/src/client/events.rs +++ b/dash-spv/src/client/events.rs @@ -16,7 +16,7 @@ use super::DashSpvClient; impl DashSpvClient { /// Subscribe to sync progress updates via watch channel. - pub async fn subscribe_progress(&self) -> watch::Receiver { + pub(crate) async fn subscribe_progress(&self) -> watch::Receiver { self.sync_coordinator.lock().await.subscribe_progress() } @@ -26,12 +26,12 @@ impl DashSpvClient broadcast::Receiver { + pub(crate) async fn subscribe_sync_events(&self) -> broadcast::Receiver { self.sync_coordinator.lock().await.subscribe_events() } /// Subscribe to network events. - pub async fn subscribe_network_events(&self) -> broadcast::Receiver { + pub(crate) async fn subscribe_network_events(&self) -> broadcast::Receiver { self.network.lock().await.subscribe_network_events() } } diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index f9cb8848e..9d6187549 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -25,6 +25,7 @@ //! Never acquire locks in reverse order or deadlock will occur! pub mod config; +pub mod event_handler; mod core; mod events; @@ -36,6 +37,7 @@ mod transactions; // Re-export public types from extracted modules pub use config::ClientConfig; +pub use event_handler::EventHandler; // Re-export the main client struct pub use core::DashSpvClient; diff --git a/dash-spv/src/client/sync_coordinator.rs b/dash-spv/src/client/sync_coordinator.rs index 3709a1387..b792f07bb 100644 --- a/dash-spv/src/client/sync_coordinator.rs +++ b/dash-spv/src/client/sync_coordinator.rs @@ -1,13 +1,19 @@ //! Sync coordination and orchestration. +use std::sync::Arc; +use std::time::Duration; + +use tokio_util::sync::CancellationToken; + +use super::event_handler::{spawn_broadcast_monitor, spawn_progress_monitor}; use super::DashSpvClient; +use crate::client::EventHandler; use crate::error::Result; use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::SyncProgress; +use crate::SpvError; use key_wallet::manager::WalletInterface; -use std::time::Duration; -use tokio_util::sync::CancellationToken; const SYNC_COORDINATOR_TICK_MS: Duration = Duration::from_millis(100); @@ -17,22 +23,71 @@ impl DashSpvClient Result<()> { - self.start().await?; + /// Subscribes to all event channels internally and dispatches events to the + /// provided `handler`. Calls `start()` internally, runs continuous network + /// monitoring, and calls `stop()` before returning. + pub async fn run( + &self, + token: CancellationToken, + handler: Arc, + ) -> Result<()> { + if let Err(e) = self.start().await { + handler.on_error(&e.to_string()); + return Err(e); + } tracing::info!("Starting continuous network monitoring..."); + let monitor_shutdown = CancellationToken::new(); + let monitor_failure = CancellationToken::new(); + + // Subscribe to channels + let sync_event_rx = self.subscribe_sync_events().await; + let network_event_rx = self.subscribe_network_events().await; + let progress_rx = self.subscribe_progress().await; + let wallet_event_rx = self.wallet.read().await.subscribe_events(); + + // Spawn monitoring tasks + let sync_task = spawn_broadcast_monitor( + "Sync event", + sync_event_rx, + handler.clone(), + monitor_shutdown.clone(), + monitor_failure.clone(), + |h, event| h.on_sync_event(event), + ); + + let network_task = spawn_broadcast_monitor( + "Network event", + network_event_rx, + handler.clone(), + monitor_shutdown.clone(), + monitor_failure.clone(), + |h, event| h.on_network_event(event), + ); + + let wallet_task = spawn_broadcast_monitor( + "Wallet event", + wallet_event_rx, + handler.clone(), + monitor_shutdown.clone(), + monitor_failure.clone(), + |h, event| h.on_wallet_event(event), + ); + + let progress_task = spawn_progress_monitor( + progress_rx, + handler.clone(), + monitor_shutdown.clone(), + monitor_failure.clone(), + ); + + // Run the sync loop let mut sync_coordinator_tick_interval = tokio::time::interval(SYNC_COORDINATOR_TICK_MS); - let mut progress_updates = self.sync_coordinator.lock().await.subscribe_progress(); - let mut wallet_events = self.wallet.read().await.subscribe_events(); - let error = loop { - // Check if we should stop + let error: Option = loop { let running = self.running.read().await; if !*running { tracing::info!("Stopping network monitoring"); @@ -40,31 +95,7 @@ impl DashSpvClient { - match result { - Ok(()) => { - tracing::info!("Sync progress: {}", *progress_updates.borrow()); - None - } - Err(_) => { - tracing::warn!("Progress channel closed."); - break None - } - } - } - result = wallet_events.recv() => { - match result { - Ok(event) => { - tracing::info!("Wallet event: {}", event.description()); - None - } - Err(e) => { - tracing::warn!("Wallet events channel error: {e}"); - break None - } - } - } + let error: Option = tokio::select! { _ = sync_coordinator_tick_interval.tick() => { self.sync_coordinator.lock().await.tick().await.err().map(Into::into) } @@ -72,14 +103,24 @@ impl DashSpvClient { + break Some(crate::SpvError::ChannelFailure( + "event monitor".into(), + "broadcast receiver lagged".into(), + )) + } }; - if error.is_some() { + if let Some(ref e) = error { + handler.on_error(&e.to_string()); break error; } }; - // Always stop the client + // Cancel monitoring tasks and wait for them + monitor_shutdown.cancel(); + let _ = tokio::join!(sync_task, network_task, wallet_task, progress_task); + let stop_result = self.stop().await; match error { diff --git a/dash-spv/src/lib.rs b/dash-spv/src/lib.rs index 08351dc85..34e3457e1 100644 --- a/dash-spv/src/lib.rs +++ b/dash-spv/src/lib.rs @@ -37,7 +37,7 @@ //! let client = DashSpvClient::new(config.clone(), network, storage, wallet).await?; //! let shutdown_token = CancellationToken::new(); //! -//! client.run(shutdown_token).await?; +//! client.run(shutdown_token, std::sync::Arc::new(())).await?; //! //! Ok(()) //! } @@ -69,7 +69,7 @@ pub mod validation; // Re-export main types for convenience pub use client::config::MempoolStrategy; -pub use client::{ClientConfig, DashSpvClient}; +pub use client::{ClientConfig, DashSpvClient, EventHandler}; pub use error::{ LoggingError, LoggingResult, NetworkError, SpvError, StorageError, SyncError, ValidationError, }; diff --git a/dash-spv/src/main.rs b/dash-spv/src/main.rs index 6e92e87d0..c608dd59a 100644 --- a/dash-spv/src/main.rs +++ b/dash-spv/src/main.rs @@ -6,11 +6,33 @@ use std::process; use std::sync::Arc; use clap::{Arg, Command}; -use dash_spv::{ClientConfig, DashSpvClient, LevelFilter, MempoolStrategy, Network}; -use key_wallet::manager::WalletManager; +use dash_spv::sync::{SyncEvent, SyncProgress}; +use dash_spv::{ClientConfig, DashSpvClient, EventHandler, LevelFilter, MempoolStrategy, Network}; +use key_wallet::manager::{WalletEvent, WalletManager}; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use tokio_util::sync::CancellationToken; +/// Logs all SPV client events via tracing. +struct LoggingEventHandler; + +impl EventHandler for LoggingEventHandler { + fn on_sync_event(&self, event: &SyncEvent) { + tracing::info!("{}", event.description()); + } + + fn on_progress(&self, progress: &SyncProgress) { + tracing::info!("Sync progress: {}", progress); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + tracing::info!("Wallet: {}", event.description()); + } + + fn on_error(&self, error: &str) { + tracing::error!("{}", error); + } +} + #[tokio::main] async fn main() { if let Err(e) = run().await { @@ -352,7 +374,7 @@ async fn run_client( ctrl_c_token.cancel(); }); - client.run(shutdown_token).await?; + client.run(shutdown_token, Arc::new(LoggingEventHandler)).await?; Ok(()) } diff --git a/dash-spv/src/test_utils/event_handler.rs b/dash-spv/src/test_utils/event_handler.rs new file mode 100644 index 000000000..7d7f21f98 --- /dev/null +++ b/dash-spv/src/test_utils/event_handler.rs @@ -0,0 +1,81 @@ +//! Test event handler that bridges `EventHandler` back to tokio channels. +//! +//! Allows integration tests to use `run()` with an `EventHandler` while keeping +//! the ergonomic `tokio::select!` patterns that channels provide. + +use tokio::sync::{broadcast, watch}; + +use crate::client::EventHandler; +use crate::network::NetworkEvent; +use crate::sync::{SyncEvent, SyncProgress}; +use key_wallet::manager::WalletEvent; + +/// Event handler that forwards all events to internal channels. +/// +/// Tests create this handler, take receivers via the accessor methods, +/// then pass `Arc` to `run()`. +pub struct TestEventHandler { + sync_tx: broadcast::Sender, + network_tx: broadcast::Sender, + progress_tx: watch::Sender, + wallet_tx: broadcast::Sender, +} + +impl TestEventHandler { + pub fn new() -> Self { + let (sync_tx, _) = broadcast::channel(256); + let (network_tx, _) = broadcast::channel(256); + let (progress_tx, _) = watch::channel(SyncProgress::default()); + let (wallet_tx, _) = broadcast::channel(256); + Self { + sync_tx, + network_tx, + progress_tx, + wallet_tx, + } + } + + pub fn subscribe_sync_events(&self) -> broadcast::Receiver { + self.sync_tx.subscribe() + } + + pub fn subscribe_network_events(&self) -> broadcast::Receiver { + self.network_tx.subscribe() + } + + pub fn subscribe_progress(&self) -> watch::Receiver { + self.progress_tx.subscribe() + } + + pub fn subscribe_wallet_events(&self) -> broadcast::Receiver { + self.wallet_tx.subscribe() + } +} + +impl Default for TestEventHandler { + fn default() -> Self { + Self::new() + } +} + +impl EventHandler for TestEventHandler { + fn on_sync_event(&self, event: &SyncEvent) { + let _ = self.sync_tx.send(event.clone()); + } + + fn on_network_event(&self, event: &NetworkEvent) { + let _ = self.network_tx.send(event.clone()); + } + + fn on_progress(&self, progress: &SyncProgress) { + self.progress_tx.send_replace(progress.clone()); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + let _ = self.wallet_tx.send(event.clone()); + } + + fn on_error(&self, error: &str) { + tracing::error!("TestEventHandler received error: {}", error); + } +} diff --git a/dash-spv/src/test_utils/mod.rs b/dash-spv/src/test_utils/mod.rs index 83c8f7e96..1ae7b8a89 100644 --- a/dash-spv/src/test_utils/mod.rs +++ b/dash-spv/src/test_utils/mod.rs @@ -2,6 +2,7 @@ mod chain_tip; mod chain_work; mod checkpoint; mod context; +mod event_handler; mod filter; mod fs_helpers; mod network; @@ -14,6 +15,7 @@ use std::time::Duration; pub const SYNC_TIMEOUT: Duration = Duration::from_secs(180); pub use context::DashdTestContext; +pub use event_handler::TestEventHandler; pub use fs_helpers::retain_test_dir; pub use network::{test_socket_address, MockNetworkManager}; pub use node::{DashCoreNode, TestChain, WalletFile}; diff --git a/dash-spv/tests/dashd_sync/setup.rs b/dash-spv/tests/dashd_sync/setup.rs index d8caa7c47..a14a4c9b6 100644 --- a/dash-spv/tests/dashd_sync/setup.rs +++ b/dash-spv/tests/dashd_sync/setup.rs @@ -1,6 +1,6 @@ use dash_spv::network::NetworkEvent; use dash_spv::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; -use dash_spv::test_utils::{retain_test_dir, DashdTestContext, TestChain}; +use dash_spv::test_utils::{retain_test_dir, DashdTestContext, TestChain, TestEventHandler}; use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::PeerNetworkManager, @@ -274,7 +274,10 @@ impl ClientHandle { } } -/// Creates a new SPV client and starts it. +/// Creates a new SPV client and starts it with a `TestEventHandler`. +/// +/// The handler bridges events back to channels so tests can use `tokio::select!` +/// patterns while going through the `EventHandler` trait. pub(super) async fn create_and_start_client( config: &ClientConfig, wallet: Arc>>, @@ -288,14 +291,16 @@ pub(super) async fn create_and_start_client( .await .expect("Failed to create client"); - let progress_receiver = client.subscribe_progress().await; - let sync_event_receiver = client.subscribe_sync_events().await; - let network_event_receiver = client.subscribe_network_events().await; + let handler = Arc::new(TestEventHandler::new()); + let progress_receiver = handler.subscribe_progress(); + let sync_event_receiver = handler.subscribe_sync_events(); + let network_event_receiver = handler.subscribe_network_events(); + let cancel_token = CancellationToken::new(); let run_token = cancel_token.clone(); - let run_client = client.clone(); - let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); + + let run_handle = tokio::task::spawn(async move { run_client.run(run_token, handler).await }); ClientHandle { client, diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index 4b70222e0..1ad0e2ea7 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -51,7 +51,7 @@ async fn test_peer_connection() { let token = CancellationToken::new(); let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run(token, Arc::new(())).await }); // Give it time to connect to peers time::sleep(Duration::from_secs(5)).await; @@ -89,7 +89,7 @@ async fn test_peer_persistence() { let token = CancellationToken::new(); let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run(token, Arc::new(())).await }); time::sleep(Duration::from_secs(5)).await; @@ -119,7 +119,7 @@ async fn test_peer_persistence() { let cancel = token.clone(); let run_client = client.clone(); let start = tokio::time::Instant::now(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run(token, Arc::new(())).await }); // Wait for connection but with shorter timeout time::sleep(Duration::from_secs(3)).await; diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index 518ff78c2..cb5c3f365 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -53,7 +53,7 @@ async fn test_spv_client_run_stop() { let cancel = token.clone(); let run_client = client.clone(); - let handle = tokio::spawn(async move { run_client.run(token).await }); + let handle = tokio::spawn(async move { run_client.run(token, Arc::new(())).await }); tokio::time::timeout(Duration::from_secs(5), async { while !client.is_running().await {