diff --git a/dash-spv-ffi/FFI_API.md b/dash-spv-ffi/FFI_API.md index 8f4606063..1ca3cfcb1 100644 --- a/dash-spv-ffi/FFI_API.md +++ b/dash-spv-ffi/FFI_API.md @@ -4,17 +4,15 @@ This document provides a comprehensive reference for all FFI (Foreign Function I **Auto-generated**: This documentation is automatically generated from the source code. Do not edit manually. -**Total Functions**: 49 +**Total Functions**: 39 ## Table of Contents - [Client Management](#client-management) - [Configuration](#configuration) - [Synchronization](#synchronization) -- [Wallet Operations](#wallet-operations) - [Transaction Management](#transaction-management) - [Platform Integration](#platform-integration) -- [Event Callbacks](#event-callbacks) - [Error Handling](#error-handling) - [Utility Functions](#utility-functions) @@ -55,25 +53,14 @@ Functions: 16 ### Synchronization -Functions: 5 +Functions: 3 | Function | Description | Module | |----------|-------------|--------| -| `dash_spv_ffi_client_clear_sync_event_callbacks` | Clear sync event callbacks | client | | `dash_spv_ffi_client_get_manager_sync_progress` | Get the current manager-based sync progress | client | | `dash_spv_ffi_client_get_sync_progress` | Get the current sync progress snapshot | client | -| `dash_spv_ffi_client_set_sync_event_callbacks` | Set sync event callbacks for push-based event notifications | client | | `dash_spv_ffi_sync_progress_destroy` | Destroy an `FFISyncProgress` object and all its nested pointers | types | -### Wallet Operations - -Functions: 2 - -| Function | Description | Module | -|----------|-------------|--------| -| `dash_spv_ffi_client_clear_wallet_event_callbacks` | Clear wallet event callbacks | client | -| `dash_spv_ffi_client_set_wallet_event_callbacks` | Set wallet event callbacks for push-based event notifications | client | - ### Transaction Management Functions: 1 @@ -91,19 +78,6 @@ Functions: 2 | `ffi_dash_spv_get_platform_activation_height` | Gets the platform activation height from the Core chain # Safety This... | platform_integration | | `ffi_dash_spv_get_quorum_public_key` | Gets a quorum public key from the Core chain # Safety This function is... | platform_integration | -### Event Callbacks - -Functions: 6 - -| Function | Description | Module | -|----------|-------------|--------| -| `dash_spv_ffi_client_clear_client_error_callback` | Clear the client error callback | client | -| `dash_spv_ffi_client_clear_network_event_callbacks` | Clear network event callbacks | client | -| `dash_spv_ffi_client_clear_progress_callback` | Clear progress callback | client | -| `dash_spv_ffi_client_set_client_error_callback` | Set a callback for fatal client errors (start failure, sync thread crash) | client | -| `dash_spv_ffi_client_set_network_event_callbacks` | Set network event callbacks for push-based event notifications | client | -| `dash_spv_ffi_client_set_progress_callback` | Set progress callback for sync progress updates | client | - ### Error Handling Functions: 1 @@ -155,14 +129,14 @@ Destroy the client and free associated resources. # Safety - `client` must be e #### `dash_spv_ffi_client_new` ```c -dash_spv_ffi_client_new(config: *const FFIClientConfig,) -> *mut FFIDashSpvClient +dash_spv_ffi_client_new(config: *const FFIClientConfig, callbacks: FFIEventCallbacks,) -> *mut FFIDashSpvClient ``` **Description:** -Create a new SPV client and return an opaque pointer. # Safety - `config` must be a valid, non-null pointer for the duration of the call. - The returned pointer must be freed with `dash_spv_ffi_client_destroy`. +Create a new SPV client and return an opaque pointer. # Safety - `config` must be a valid, non-null pointer for the duration of the call. - `callbacks` is taken by value (function pointers and `user_data` pointers are copied internally). The struct itself may be dropped after the call, but all `user_data` pointer targets must remain valid until `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. - Callback functions and `user_data` pointees must be safe to use from background threads; different callback groups may be invoked concurrently. - The returned pointer must be freed with `dash_spv_ffi_client_destroy`. **Safety:** -- `config` must be a valid, non-null pointer for the duration of the call. - The returned pointer must be freed with `dash_spv_ffi_client_destroy`. +- `config` must be a valid, non-null pointer for the duration of the call. - `callbacks` is taken by value (function pointers and `user_data` pointers are copied internally). The struct itself may be dropped after the call, but all `user_data` pointer targets must remain valid until `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. - Callback functions and `user_data` pointees must be safe to use from background threads; different callback groups may be invoked concurrently. - The returned pointer must be freed with `dash_spv_ffi_client_destroy`. **Module:** `client` @@ -426,22 +400,6 @@ dash_spv_ffi_config_testnet() -> *mut FFIClientConfig ### Synchronization - Detailed -#### `dash_spv_ffi_client_clear_sync_event_callbacks` - -```c -dash_spv_ffi_client_clear_sync_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear sync event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - #### `dash_spv_ffi_client_get_manager_sync_progress` ```c @@ -474,22 +432,6 @@ Get the current sync progress snapshot. # Safety - `client` must be a valid, no --- -#### `dash_spv_ffi_client_set_sync_event_callbacks` - -```c -dash_spv_ffi_client_set_sync_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFISyncEventCallbacks,) -> i32 -``` - -**Description:** -Set sync event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - #### `dash_spv_ffi_sync_progress_destroy` ```c @@ -506,40 +448,6 @@ Destroy an `FFISyncProgress` object and all its nested pointers. # Safety - `pr --- -### Wallet Operations - Detailed - -#### `dash_spv_ffi_client_clear_wallet_event_callbacks` - -```c -dash_spv_ffi_client_clear_wallet_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear wallet event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_wallet_event_callbacks` - -```c -dash_spv_ffi_client_set_wallet_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFIWalletEventCallbacks,) -> i32 -``` - -**Description:** -Set wallet event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - ### Transaction Management - Detailed #### `dash_spv_ffi_client_broadcast_transaction` @@ -592,104 +500,6 @@ This function is unsafe because: - The caller must ensure all pointers are valid --- -### Event Callbacks - Detailed - -#### `dash_spv_ffi_client_clear_client_error_callback` - -```c -dash_spv_ffi_client_clear_client_error_callback(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear the client error callback. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_clear_network_event_callbacks` - -```c -dash_spv_ffi_client_clear_network_event_callbacks(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear network event callbacks. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_clear_progress_callback` - -```c -dash_spv_ffi_client_clear_progress_callback(client: *mut FFIDashSpvClient,) -> i32 -``` - -**Description:** -Clear progress callback. # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_client_error_callback` - -```c -dash_spv_ffi_client_set_client_error_callback(client: *mut FFIDashSpvClient, callback: FFIClientErrorCallback,) -> i32 -``` - -**Description:** -Set a callback for fatal client errors (start failure, sync thread crash). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background thread. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background thread. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_network_event_callbacks` - -```c -dash_spv_ffi_client_set_network_event_callbacks(client: *mut FFIDashSpvClient, callbacks: FFINetworkEventCallbacks,) -> i32 -``` - -**Description:** -Set network event callbacks for push-based event notifications. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. - Callbacks must be thread-safe as they may be called from a background task. - -**Module:** `client` - ---- - -#### `dash_spv_ffi_client_set_progress_callback` - -```c -dash_spv_ffi_client_set_progress_callback(client: *mut FFIDashSpvClient, callback: crate::FFIProgressCallback,) -> i32 -``` - -**Description:** -Set progress callback for sync progress updates. The monitoring task is spawned when `dash_spv_ffi_client_run` is called. Call this before calling run(). # Safety - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background task. - -**Safety:** -- `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. - The `callback` struct and its `user_data` must remain valid until the callback is cleared. - The callback must be thread-safe as it may be called from a background task. - -**Module:** `client` - ---- - ### Error Handling - Detailed #### `dash_spv_ffi_get_last_error` @@ -791,7 +601,7 @@ dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) -> i32 ``` **Description:** -Start the SPV client and begin syncing in the background. Subscribes to events, spawns monitoring threads, then spawns a background thread that calls `run()` (which handles start + sync loop + stop internally). Returns immediately after spawning. Use event callbacks (set via `set_sync_event_callbacks`, `set_network_event_callbacks`, `set_wallet_event_callbacks`) to receive notifications. Configure callbacks before calling `run()`. # Safety - `client` must be a valid, non-null pointer to a created client. # Returns 0 on success, error code on failure. +Start the SPV client and begin syncing in the background. Uses the event callbacks provided at client creation time. Returns immediately after spawning the sync task. # Safety - `client` must be a valid, non-null pointer to a created client. # Returns 0 on success, error code on failure. **Safety:** - `client` must be a valid, non-null pointer to a created client. @@ -943,10 +753,13 @@ Release a wallet manager obtained from `dash_spv_ffi_client_get_wallet_manager`. // Create configuration FFIClientConfig* config = dash_spv_ffi_config_testnet(); -// Create client -FFIDashSpvClient* client = dash_spv_ffi_client_new(config); +// Build event callbacks (zero-init for no-op defaults) +FFIEventCallbacks callbacks = { 0 }; + +// Create client with callbacks +FFIDashSpvClient* client = dash_spv_ffi_client_new(config, callbacks); -// Start the client and begin syncing +// Start syncing (uses callbacks provided at creation) int32_t result = dash_spv_ffi_client_run(client); if (result != 0) { const char* error = dash_spv_ffi_get_last_error(); @@ -964,22 +777,22 @@ dash_spv_ffi_config_destroy(config); ### Event Callbacks ```c -void on_block(uint32_t height, const uint8_t (*hash)[32], void* user_data) { - printf("New block at height %u\n", height); +void on_headers(uint32_t tip_height, void* user_data) { + printf("Headers stored up to height %u\n", tip_height); } -void on_transaction(const uint8_t (*txid)[32], bool confirmed, - int64_t amount, const char* addresses, - uint32_t block_height, void* user_data) { - printf("Transaction: %lld duffs\n", amount); +void on_tx(const char* wallet_id, uint32_t account_index, + const uint8_t (*txid)[32], int64_t amount, + const char* addresses, void* user_data) { + printf("Transaction: %lld duffs\n", (long long)amount); } -// Set up callbacks -FFIEventCallbacks callbacks = { - .on_block = on_block, - .on_transaction = on_transaction, - .user_data = NULL -}; +// Build callbacks struct and pass to client_new() +FFIEventCallbacks callbacks = { 0 }; +callbacks.sync.on_block_headers_stored = on_headers; +callbacks.wallet.on_transaction_received = on_tx; +FFIDashSpvClient* client = dash_spv_ffi_client_new(config, callbacks); -dash_spv_ffi_client_set_event_callbacks(client, callbacks); +// Start syncing (uses callbacks provided at creation) +dash_spv_ffi_client_run(client); ``` diff --git a/dash-spv-ffi/examples/wallet_manager_usage.rs b/dash-spv-ffi/examples/wallet_manager_usage.rs index a49cc5af3..fdacaceda 100644 --- a/dash-spv-ffi/examples/wallet_manager_usage.rs +++ b/dash-spv-ffi/examples/wallet_manager_usage.rs @@ -16,7 +16,7 @@ fn main() { } // Create an SPV client - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); if client.is_null() { panic!("Failed to create client"); } diff --git a/dash-spv-ffi/scripts/generate_ffi_docs.py b/dash-spv-ffi/scripts/generate_ffi_docs.py index b36a4e5b1..d26411183 100755 --- a/dash-spv-ffi/scripts/generate_ffi_docs.py +++ b/dash-spv-ffi/scripts/generate_ffi_docs.py @@ -305,10 +305,13 @@ def generate_markdown(functions: List[FFIFunction]) -> str: md.append("// Create configuration") md.append("FFIClientConfig* config = dash_spv_ffi_config_testnet();") md.append("") - md.append("// Create client") - md.append("FFIDashSpvClient* client = dash_spv_ffi_client_new(config);") + md.append("// Build event callbacks (zero-init for no-op defaults)") + md.append("FFIEventCallbacks callbacks = { 0 };") md.append("") - md.append("// Start the client and begin syncing") + md.append("// Create client with callbacks") + md.append("FFIDashSpvClient* client = dash_spv_ffi_client_new(config, callbacks);") + md.append("") + md.append("// Start syncing (uses callbacks provided at creation)") md.append("int32_t result = dash_spv_ffi_client_run(client);") md.append("if (result != 0) {") md.append(" const char* error = dash_spv_ffi_get_last_error();") @@ -327,24 +330,24 @@ def generate_markdown(functions: List[FFIFunction]) -> str: md.append("### Event Callbacks") md.append("") md.append("```c") - md.append("void on_block(uint32_t height, const uint8_t (*hash)[32], void* user_data) {") - md.append(" printf(\"New block at height %u\\n\", height);") + md.append("void on_headers(uint32_t tip_height, void* user_data) {") + md.append(" printf(\"Headers stored up to height %u\\n\", tip_height);") md.append("}") md.append("") - md.append("void on_transaction(const uint8_t (*txid)[32], bool confirmed, ") - md.append(" int64_t amount, const char* addresses, ") - md.append(" uint32_t block_height, void* user_data) {") - md.append(" printf(\"Transaction: %lld duffs\\n\", amount);") + md.append("void on_tx(const char* wallet_id, uint32_t account_index,") + md.append(" const uint8_t (*txid)[32], int64_t amount,") + md.append(" const char* addresses, void* user_data) {") + md.append(" printf(\"Transaction: %lld duffs\\n\", (long long)amount);") md.append("}") md.append("") - md.append("// Set up callbacks") - md.append("FFIEventCallbacks callbacks = {") - md.append(" .on_block = on_block,") - md.append(" .on_transaction = on_transaction,") - md.append(" .user_data = NULL") - md.append("};") + md.append("// Build callbacks struct and pass to client_new()") + md.append("FFIEventCallbacks callbacks = { 0 };") + md.append("callbacks.sync.on_block_headers_stored = on_headers;") + md.append("callbacks.wallet.on_transaction_received = on_tx;") + md.append("FFIDashSpvClient* client = dash_spv_ffi_client_new(config, callbacks);") md.append("") - md.append("dash_spv_ffi_client_set_event_callbacks(client, callbacks);") + md.append("// Start syncing (uses callbacks provided at creation)") + md.append("dash_spv_ffi_client_run(client);") md.append("```") md.append("") diff --git a/dash-spv-ffi/src/bin/ffi_cli.rs b/dash-spv-ffi/src/bin/ffi_cli.rs index b9c4a62d1..c3143bcb4 100644 --- a/dash-spv-ffi/src/bin/ffi_cli.rs +++ b/dash-spv-ffi/src/bin/ffi_cli.rs @@ -364,8 +364,48 @@ fn main() { .to_string() }); - // Create client - let client = dash_spv_ffi_client_new(cfg); + // Build all event callbacks in a single struct + let callbacks = FFIEventCallbacks { + sync: FFISyncEventCallbacks { + on_sync_start: Some(on_sync_start), + on_block_headers_stored: Some(on_block_headers_stored), + on_block_header_sync_complete: Some(on_block_header_sync_complete), + on_filter_headers_stored: Some(on_filter_headers_stored), + on_filter_headers_sync_complete: Some(on_filter_headers_sync_complete), + on_filters_stored: Some(on_filters_stored), + on_filters_sync_complete: Some(on_filters_sync_complete), + on_blocks_needed: Some(on_blocks_needed), + on_block_processed: Some(on_block_processed), + on_masternode_state_updated: Some(on_masternode_state_updated), + on_chainlock_received: Some(on_chainlock_received), + on_instantlock_received: Some(on_instantlock_received), + on_manager_error: Some(on_manager_error), + on_sync_complete: Some(on_sync_complete), + user_data: ptr::null_mut(), + }, + network: FFINetworkEventCallbacks { + on_peer_connected: Some(on_peer_connected), + on_peer_disconnected: Some(on_peer_disconnected), + on_peers_updated: Some(on_peers_updated), + user_data: ptr::null_mut(), + }, + progress: FFIProgressCallback { + on_progress: Some(on_progress_update), + user_data: ptr::null_mut(), + }, + wallet: FFIWalletEventCallbacks { + on_transaction_received: Some(on_transaction_received), + on_balance_updated: Some(on_balance_updated), + user_data: ptr::null_mut(), + }, + error: FFIClientErrorCallback { + on_error: None, + user_data: ptr::null_mut(), + }, + }; + + // Create client with event callbacks + let client = dash_spv_ffi_client_new(cfg, callbacks); if client.is_null() { eprintln!( "Client create failed: {}", @@ -403,80 +443,6 @@ fn main() { dash_spv_ffi_wallet_manager_free(wallet_manager); } - // Set up event callbacks - let sync_callbacks = FFISyncEventCallbacks { - on_sync_start: Some(on_sync_start), - on_block_headers_stored: Some(on_block_headers_stored), - on_block_header_sync_complete: Some(on_block_header_sync_complete), - on_filter_headers_stored: Some(on_filter_headers_stored), - on_filter_headers_sync_complete: Some(on_filter_headers_sync_complete), - on_filters_stored: Some(on_filters_stored), - on_filters_sync_complete: Some(on_filters_sync_complete), - on_blocks_needed: Some(on_blocks_needed), - on_block_processed: Some(on_block_processed), - on_masternode_state_updated: Some(on_masternode_state_updated), - on_chainlock_received: Some(on_chainlock_received), - on_instantlock_received: Some(on_instantlock_received), - on_manager_error: Some(on_manager_error), - on_sync_complete: Some(on_sync_complete), - user_data: ptr::null_mut(), - }; - - let network_callbacks = FFINetworkEventCallbacks { - on_peer_connected: Some(on_peer_connected), - on_peer_disconnected: Some(on_peer_disconnected), - on_peers_updated: Some(on_peers_updated), - user_data: ptr::null_mut(), - }; - - let wallet_callbacks = FFIWalletEventCallbacks { - on_transaction_received: Some(on_transaction_received), - on_balance_updated: Some(on_balance_updated), - user_data: ptr::null_mut(), - }; - - let rc = dash_spv_ffi_client_set_sync_event_callbacks(client, sync_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set sync callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - let rc = dash_spv_ffi_client_set_network_event_callbacks(client, network_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set network callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - let rc = dash_spv_ffi_client_set_wallet_event_callbacks(client, wallet_callbacks); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set wallet callbacks: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - - // Set up progress callback - let progress_callback = FFIProgressCallback { - on_progress: Some(on_progress_update), - user_data: ptr::null_mut(), - }; - - let rc = dash_spv_ffi_client_set_progress_callback(client, progress_callback); - if rc != FFIErrorCode::Success as i32 { - eprintln!( - "Failed to set progress callback: {}", - ffi_string_to_rust(dash_spv_ffi_get_last_error()) - ); - std::process::exit(1); - } - println!("Event and progress callbacks configured, starting sync..."); // Run client - starts sync in background and returns immediately diff --git a/dash-spv-ffi/src/callbacks.rs b/dash-spv-ffi/src/callbacks.rs index 90012ee9b..d4f6b4e1e 100644 --- a/dash-spv-ffi/src/callbacks.rs +++ b/dash-spv-ffi/src/callbacks.rs @@ -7,6 +7,9 @@ //! - `FFIWalletEventCallbacks` - Wallet manager events use crate::{dash_spv_ffi_sync_progress_destroy, FFISyncProgress}; +use dash_spv::network::NetworkEvent; +use dash_spv::sync::{SyncEvent, SyncProgress}; +use dash_spv::EventHandler; use dashcore::hashes::Hash; use key_wallet::manager::WalletEvent; use std::ffi::CString; @@ -618,6 +621,49 @@ impl FFIClientErrorCallback { } } +// ============================================================================ +// FFIEventCallbacks - All callbacks in a single C-compatible struct +// ============================================================================ + +/// All event callbacks grouped into a single struct. +/// +/// Pass this to `dash_spv_ffi_client_new`. Any callback group left at its +/// default (all function pointers null) will simply not receive events. +#[repr(C)] +#[derive(Clone, Default)] +pub struct FFIEventCallbacks { + pub sync: FFISyncEventCallbacks, + pub network: FFINetworkEventCallbacks, + pub progress: FFIProgressCallback, + pub wallet: FFIWalletEventCallbacks, + pub error: FFIClientErrorCallback, +} + +unsafe impl Send for FFIEventCallbacks {} +unsafe impl Sync for FFIEventCallbacks {} + +impl EventHandler for FFIEventCallbacks { + fn on_sync_event(&self, event: &SyncEvent) { + self.sync.dispatch(event); + } + + fn on_network_event(&self, event: &NetworkEvent) { + self.network.dispatch(event); + } + + fn on_progress(&self, progress: &SyncProgress) { + self.progress.dispatch(progress); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + self.wallet.dispatch(event); + } + + fn on_error(&self, error: &str) { + self.error.dispatch(error); + } +} + impl FFIWalletEventCallbacks { /// Dispatch a WalletEvent to the appropriate callback. pub fn dispatch(&self, event: &WalletEvent) { diff --git a/dash-spv-ffi/src/client.rs b/dash-spv-ffi/src/client.rs index 41ef3a069..02a60515b 100644 --- a/dash-spv-ffi/src/client.rs +++ b/dash-spv-ffi/src/client.rs @@ -1,7 +1,6 @@ use crate::{ - null_check, set_last_error, FFIClientConfig, FFIClientErrorCallback, FFIErrorCode, - FFINetworkEventCallbacks, FFIProgressCallback, FFISyncEventCallbacks, FFISyncProgress, - FFIWalletEventCallbacks, FFIWalletManager, + null_check, set_last_error, FFIClientConfig, FFIErrorCode, FFIEventCallbacks, FFISyncProgress, + FFIWalletManager, }; // Import wallet types from key-wallet-ffi use key_wallet_ffi::FFIWalletManager as KeyWalletFFIWalletManager; @@ -10,127 +9,42 @@ use dash_spv::storage::DiskStorageManager; use dash_spv::DashSpvClient; use tracing::dispatcher::{get_default, set_default}; -use std::mem::{forget, take}; +use std::mem::forget; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; -use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -/// Spawns a tokio task that monitors a broadcast channel and dispatches events to callbacks. -fn spawn_broadcast_monitor( - name: &'static str, - receiver: broadcast::Receiver, - callbacks: Arc>>, - shutdown: CancellationToken, - rt: &Runtime, - dispatch_fn: F, -) -> JoinHandle<()> -where - E: Clone + Send + 'static, - C: Clone + Send + 'static, - F: Fn(&C, &E) + Send + 'static, -{ - let mut receiver = receiver; - rt.spawn(async move { - tracing::debug!("{} monitoring task started", name); - loop { - tokio::select! { - result = receiver.recv() => { - match result { - Ok(event) => { - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &event); - } - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => continue, - } - } - _ = shutdown.cancelled() => break, - } - } - tracing::debug!("{} monitoring task exiting", name); - }) -} - -/// Spawns a tokio task that monitors a watch channel for progress updates. -/// -/// Sends the initial progress value, then monitors for changes. -fn spawn_progress_monitor( - receiver: watch::Receiver

, - callbacks: Arc>>, - shutdown: CancellationToken, - rt: &Runtime, - dispatch_fn: F, -) -> JoinHandle<()> -where - P: Clone + Send + Sync + 'static, - C: Clone + Send + 'static, - F: Fn(&C, &P) + Send + 'static, -{ - let mut receiver = receiver; - rt.spawn(async move { - tracing::debug!("Progress monitoring task started"); - - // Send initial progress - { - let progress = receiver.borrow_and_update().clone(); - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &progress); - } - } - - loop { - tokio::select! { - result = receiver.changed() => { - match result { - Ok(()) => { - let progress = receiver.borrow_and_update().clone(); - let cb = callbacks.lock().unwrap().clone(); - if let Some(ref cb) = cb { - dispatch_fn(cb, &progress); - } - } - Err(_) => break, - } - } - _ = shutdown.cancelled() => break, - } - } - tracing::debug!("Progress monitoring task exiting"); - }) -} - /// FFI wrapper around `DashSpvClient`. type InnerClient = DashSpvClient< key_wallet::manager::WalletManager, dash_spv::network::PeerNetworkManager, DiskStorageManager, + FFIEventCallbacks, >; pub struct FFIDashSpvClient { pub(crate) inner: InnerClient, pub(crate) runtime: Arc, - active_tasks: Mutex>>, + run_task: Mutex>>, shutdown_token: CancellationToken, - sync_event_callbacks: Arc>>, - network_event_callbacks: Arc>>, - wallet_event_callbacks: Arc>>, - progress_callback: Arc>>, - client_error_callback: Arc>>, } /// Create a new SPV client and return an opaque pointer. /// /// # Safety /// - `config` must be a valid, non-null pointer for the duration of the call. +/// - `callbacks` is taken by value (function pointers and `user_data` pointers +/// are copied internally). The struct itself may be dropped after the call, +/// but all `user_data` pointer targets must remain valid until +/// `dash_spv_ffi_client_stop` or `dash_spv_ffi_client_destroy` is called. +/// - Callback functions and `user_data` pointees must be safe to use from +/// background threads; different callback groups may be invoked concurrently. /// - The returned pointer must be freed with `dash_spv_ffi_client_destroy`. #[no_mangle] pub unsafe extern "C" fn dash_spv_ffi_client_new( config: *const FFIClientConfig, + callbacks: FFIEventCallbacks, ) -> *mut FFIDashSpvClient { null_check!(config, std::ptr::null_mut()); @@ -171,7 +85,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( match (network, storage) { (Ok(network), Ok(storage)) => { - DashSpvClient::new(client_config, network, storage, wallet).await + DashSpvClient::new(client_config, network, storage, wallet, Arc::new(callbacks)) + .await } (Err(e), _) => Err(e), (_, Err(e)) => Err(dash_spv::SpvError::Storage(e)), @@ -183,13 +98,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( let ffi_client = FFIDashSpvClient { inner: client, runtime, - active_tasks: Mutex::new(Vec::new()), + run_task: Mutex::new(None), shutdown_token: CancellationToken::new(), - sync_event_callbacks: Arc::new(Mutex::new(None)), - network_event_callbacks: Arc::new(Mutex::new(None)), - wallet_event_callbacks: Arc::new(Mutex::new(None)), - progress_callback: Arc::new(Mutex::new(None)), - client_error_callback: Arc::new(Mutex::new(None)), }; Box::into_raw(Box::new(ffi_client)) } @@ -201,30 +111,22 @@ pub unsafe extern "C" fn dash_spv_ffi_client_new( } impl FFIDashSpvClient { - /// Abort all active monitoring tasks and wait for them to finish. - fn cancel_active_tasks(&self) { - let tasks = { - let mut guard = self.active_tasks.lock().unwrap(); - take(&mut *guard) - }; - - for task in &tasks { + /// Cancel the run task and wait for it to finish. + fn cancel_run_task(&self) { + let task = self.run_task.lock().unwrap().take(); + if let Some(task) = task { task.abort(); - } - - // Wait for all tasks to finish - self.runtime.block_on(async { - for task in tasks { + self.runtime.block_on(async { let _ = task.await; - } - }); + }); + } } } fn stop_client_internal(client: &mut FFIDashSpvClient) -> Result<(), dash_spv::SpvError> { client.shutdown_token.cancel(); - client.cancel_active_tasks(); + client.cancel_run_task(); let result = client.runtime.block_on(async { client.inner.stop().await }); @@ -281,13 +183,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_stop(client: *mut FFIDashSpvClient) /// Start the SPV client and begin syncing in the background. /// -/// Subscribes to events, spawns monitoring threads, then spawns a background -/// thread that calls `run()` (which handles start + sync loop + stop internally). -/// Returns immediately after spawning. -/// -/// Use event callbacks (set via `set_sync_event_callbacks`, -/// `set_network_event_callbacks`, `set_wallet_event_callbacks`) to receive -/// notifications. Configure callbacks before calling `run()`. +/// Uses the event callbacks provided at client creation time. Returns +/// immediately after spawning the sync task. /// /// # Safety /// - `client` must be a valid, non-null pointer to a created client. @@ -300,87 +197,24 @@ pub unsafe extern "C" fn dash_spv_ffi_client_run(client: *mut FFIDashSpvClient) let client = &(*client); - tracing::info!("dash_spv_ffi_client_run: setting up event monitoring"); + tracing::info!("dash_spv_ffi_client_run: starting sync"); let shutdown_token = client.shutdown_token.clone(); - - // Subscribe to events before spawning tasks - let (sync_event_rx, network_event_rx, progress_rx, wallet_event_rx) = - client.runtime.block_on(async { - let wallet_rx = client.inner.wallet().read().await.subscribe_events(); - ( - client.inner.subscribe_sync_events().await, - client.inner.subscribe_network_events().await, - client.inner.subscribe_progress().await, - wallet_rx, - ) - }); - - // Spawn event monitoring tasks for each callback type that is set - let mut tasks = client.active_tasks.lock().unwrap(); - - if client.sync_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Sync event", - sync_event_rx.resubscribe(), - client.sync_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); - } - - if client.network_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Network event", - network_event_rx.resubscribe(), - client.network_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); - } - - if client.progress_callback.lock().unwrap().is_some() { - tasks.push(spawn_progress_monitor( - progress_rx.clone(), - client.progress_callback.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, progress| cb.dispatch(progress), - )); - } - - if client.wallet_event_callbacks.lock().unwrap().is_some() { - tasks.push(spawn_broadcast_monitor( - "Wallet event", - wallet_event_rx.resubscribe(), - client.wallet_event_callbacks.clone(), - shutdown_token.clone(), - &client.runtime, - |cb, event| cb.dispatch(event), - )); - } - - let error_callback = client.client_error_callback.clone(); let spv_client = client.inner.clone(); - tasks.push(client.runtime.spawn(async move { + + let task = client.runtime.spawn(async move { tracing::debug!("Sync task: starting run"); if let Err(e) = spv_client.run(shutdown_token).await { tracing::error!("Sync task: error: {}", e); - let cb = error_callback.lock().unwrap().clone(); - if let Some(ref cb) = cb { - cb.dispatch(&e.to_string()); - } } tracing::debug!("Sync task: exiting"); - })); + }); - drop(tasks); + *client.run_task.lock().unwrap() = Some(task); - tracing::info!("dash_spv_ffi_client_run: background tasks spawned, returning"); + tracing::info!("dash_spv_ffi_client_run: background task spawned, returning"); FFIErrorCode::Success as i32 } @@ -508,10 +342,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_destroy(client: *mut FFIDashSpvClie let _ = client.inner.stop().await; }); - // Abort and await all active tasks - client.cancel_active_tasks(); + // Abort and await the run task + client.cancel_run_task(); - tracing::info!("✅ FFI client destroyed and all tasks cleaned up"); + tracing::info!("FFI client destroyed and all tasks cleaned up"); } } @@ -568,199 +402,3 @@ pub unsafe extern "C" fn dash_spv_ffi_wallet_manager_free(manager: *mut FFIWalle key_wallet_ffi::wallet_manager::wallet_manager_free(manager as *mut KeyWalletFFIWalletManager); } - -// ============================================================================ -// Event Callback Functions -// ============================================================================ - -/// Set sync event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_sync_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFISyncEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.sync_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear sync event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_sync_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.sync_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set network event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_network_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFINetworkEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.network_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear network event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_network_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.network_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set wallet event callbacks for push-based event notifications. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callbacks` struct and its `user_data` must remain valid until callbacks are cleared. -/// - Callbacks must be thread-safe as they may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_wallet_event_callbacks( - client: *mut FFIDashSpvClient, - callbacks: FFIWalletEventCallbacks, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.wallet_event_callbacks.lock().unwrap() = Some(callbacks); - - FFIErrorCode::Success as i32 -} - -/// Clear wallet event callbacks. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_wallet_event_callbacks( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.wallet_event_callbacks.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set progress callback for sync progress updates. -/// -/// The monitoring task is spawned when `dash_spv_ffi_client_run` is called. -/// Call this before calling run(). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callback` struct and its `user_data` must remain valid until the callback is cleared. -/// - The callback must be thread-safe as it may be called from a background task. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_progress_callback( - client: *mut FFIDashSpvClient, - callback: crate::FFIProgressCallback, -) -> i32 { - null_check!(client); - - let client = &(*client); - let progress = client.runtime.block_on(async { client.inner.progress().await }); - let mut cb_guard = client.progress_callback.lock().unwrap(); - *cb_guard = Some(callback); - if let Some(ref cb) = *cb_guard { - cb.dispatch(&progress); - } - - FFIErrorCode::Success as i32 -} - -/// Clear progress callback. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_progress_callback( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.progress_callback.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} - -/// Set a callback for fatal client errors (start failure, sync thread crash). -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -/// - The `callback` struct and its `user_data` must remain valid until the callback is cleared. -/// - The callback must be thread-safe as it may be called from a background thread. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_set_client_error_callback( - client: *mut FFIDashSpvClient, - callback: FFIClientErrorCallback, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.client_error_callback.lock().unwrap() = Some(callback); - - FFIErrorCode::Success as i32 -} - -/// Clear the client error callback. -/// -/// # Safety -/// - `client` must be a valid, non-null pointer to an `FFIDashSpvClient`. -#[no_mangle] -pub unsafe extern "C" fn dash_spv_ffi_client_clear_client_error_callback( - client: *mut FFIDashSpvClient, -) -> i32 { - null_check!(client); - - let client = &(*client); - *client.client_error_callback.lock().unwrap() = None; - - FFIErrorCode::Success as i32 -} diff --git a/dash-spv-ffi/tests/dashd_sync/context.rs b/dash-spv-ffi/tests/dashd_sync/context.rs index 4e5ae7210..a93103281 100644 --- a/dash-spv-ffi/tests/dashd_sync/context.rs +++ b/dash-spv-ffi/tests/dashd_sync/context.rs @@ -11,9 +11,8 @@ use dash_spv::logging::{LogFileConfig, LoggingConfig, LoggingGuard}; use dash_spv::test_utils::{retain_test_dir, SYNC_TIMEOUT}; use dash_spv_ffi::client::{ dash_spv_ffi_client_destroy, dash_spv_ffi_client_get_wallet_manager, dash_spv_ffi_client_new, - dash_spv_ffi_client_run, dash_spv_ffi_client_set_network_event_callbacks, - dash_spv_ffi_client_set_sync_event_callbacks, dash_spv_ffi_client_set_wallet_event_callbacks, - dash_spv_ffi_client_stop, dash_spv_ffi_wallet_manager_free, FFIDashSpvClient, + dash_spv_ffi_client_run, dash_spv_ffi_client_stop, dash_spv_ffi_wallet_manager_free, + FFIDashSpvClient, }; use dash_spv_ffi::config::{ dash_spv_ffi_config_add_peer, dash_spv_ffi_config_destroy, dash_spv_ffi_config_new, @@ -21,6 +20,7 @@ use dash_spv_ffi::config::{ dash_spv_ffi_config_set_restrict_to_configured_peers, FFIClientConfig, }; use dash_spv_ffi::types::FFIWalletManager as FFIWalletManagerOpaque; +use dash_spv_ffi::FFIEventCallbacks; use dashcore::hashes::Hash; use dashcore::{Address, Txid}; use key_wallet_ffi::managed_account::{ @@ -129,7 +129,15 @@ impl FFITestContext { let result = dash_spv_ffi_config_set_restrict_to_configured_peers(config, true); assert_eq!(result, 0, "Failed to restrict peers"); - let client = dash_spv_ffi_client_new(config); + let tracker = Arc::new(CallbackTracker::default()); + let callbacks = FFIEventCallbacks { + sync: create_sync_callbacks(&tracker), + network: create_network_callbacks(&tracker), + wallet: create_wallet_callbacks(&tracker), + ..FFIEventCallbacks::default() + }; + + let client = dash_spv_ffi_client_new(config, callbacks); assert!(!client.is_null(), "Failed to create FFI client"); let wallet_manager = dash_spv_ffi_client_get_wallet_manager(client); @@ -145,7 +153,7 @@ impl FFITestContext { session: SessionState { client, wallet_manager, - tracker: Arc::new(CallbackTracker::default()), + tracker, }, } } @@ -214,41 +222,12 @@ impl FFITestContext { (confirmed, unconfirmed) } - /// Set up sync event callbacks and run the client. - /// - /// # Safety - /// - /// Calls FFI client functions through raw pointers held by the context. - pub(super) unsafe fn run_with_sync_callbacks(&self) { - let sync_callbacks = create_sync_callbacks(&self.session.tracker); - let result = - dash_spv_ffi_client_set_sync_event_callbacks(self.session.client, sync_callbacks); - assert_eq!(result, 0, "Failed to set sync event callbacks"); - - self.snapshot_sync_baseline(); - let result = dash_spv_ffi_client_run(self.session.client); - assert_eq!(result, 0, "Failed to run FFI client"); - } - - /// Set up sync, network, and wallet event callbacks, then run the client. + /// Run the client (callbacks were registered at creation time). /// /// # Safety /// /// Calls FFI client functions through raw pointers held by the context. - pub(super) unsafe fn run_with_all_callbacks(&self) { - let sync_cbs = create_sync_callbacks(&self.session.tracker); - let network_cbs = create_network_callbacks(&self.session.tracker); - let wallet_cbs = create_wallet_callbacks(&self.session.tracker); - - let result = dash_spv_ffi_client_set_sync_event_callbacks(self.session.client, sync_cbs); - assert_eq!(result, 0, "Failed to set sync event callbacks"); - let result = - dash_spv_ffi_client_set_network_event_callbacks(self.session.client, network_cbs); - assert_eq!(result, 0, "Failed to set network event callbacks"); - let result = - dash_spv_ffi_client_set_wallet_event_callbacks(self.session.client, wallet_cbs); - assert_eq!(result, 0, "Failed to set wallet event callbacks"); - + pub(super) unsafe fn run(&self) { self.snapshot_sync_baseline(); let result = dash_spv_ffi_client_run(self.session.client); assert_eq!(result, 0, "Failed to run FFI client"); @@ -422,7 +401,15 @@ impl FFITestContext { drop(self.session); // Recreate client from same config (same storage dir and peers) - let client = dash_spv_ffi_client_new(fixed.config); + let tracker = Arc::new(CallbackTracker::default()); + let callbacks = FFIEventCallbacks { + sync: create_sync_callbacks(&tracker), + network: create_network_callbacks(&tracker), + wallet: create_wallet_callbacks(&tracker), + ..FFIEventCallbacks::default() + }; + + let client = dash_spv_ffi_client_new(fixed.config, callbacks); assert!(!client.is_null(), "Failed to recreate FFI client"); let wallet_manager = dash_spv_ffi_client_get_wallet_manager(client); @@ -433,7 +420,7 @@ impl FFITestContext { session: SessionState { client, wallet_manager, - tracker: Arc::new(CallbackTracker::default()), + tracker, }, } } diff --git a/dash-spv-ffi/tests/dashd_sync/tests_basic.rs b/dash-spv-ffi/tests/dashd_sync/tests_basic.rs index 0f1378418..b742e2c06 100644 --- a/dash-spv-ffi/tests/dashd_sync/tests_basic.rs +++ b/dash-spv-ffi/tests/dashd_sync/tests_basic.rs @@ -19,7 +19,7 @@ fn test_wallet_sync_via_ffi() { let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); tracing::info!("Added wallet, ID: {}", hex::encode(&wallet_id)); - ctx.run_with_sync_callbacks(); + ctx.run(); tracing::info!("FFI client running"); ctx.wait_for_sync(dashd.initial_height); diff --git a/dash-spv-ffi/tests/dashd_sync/tests_callback.rs b/dash-spv-ffi/tests/dashd_sync/tests_callback.rs index a7489fddd..775cb8116 100644 --- a/dash-spv-ffi/tests/dashd_sync/tests_callback.rs +++ b/dash-spv-ffi/tests/dashd_sync/tests_callback.rs @@ -21,7 +21,7 @@ fn test_all_callbacks_during_sync() { let tracker = ctx.tracker().clone(); ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_all_callbacks(); + ctx.run(); tracing::info!("FFI client running with all callback types"); ctx.wait_for_sync(dashd.initial_height); @@ -250,7 +250,7 @@ fn test_callbacks_post_sync_transactions_and_disconnect() { let tracker = ctx.tracker().clone(); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_all_callbacks(); + ctx.run(); // Wait for initial sync ctx.wait_for_sync(dashd.initial_height); diff --git a/dash-spv-ffi/tests/dashd_sync/tests_restart.rs b/dash-spv-ffi/tests/dashd_sync/tests_restart.rs index d37ef229d..d882c0feb 100644 --- a/dash-spv-ffi/tests/dashd_sync/tests_restart.rs +++ b/dash-spv-ffi/tests/dashd_sync/tests_restart.rs @@ -19,7 +19,7 @@ fn test_ffi_restart_consistency() { let ctx = FFITestContext::new(dashd.addr); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_sync_callbacks(); + ctx.run(); ctx.wait_for_sync(dashd.initial_height); let (first_balance, _) = ctx.get_wallet_balance(&wallet_id); @@ -39,7 +39,7 @@ fn test_ffi_restart_consistency() { let ctx = ctx.restart(); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_sync_callbacks(); + ctx.run(); ctx.wait_for_sync(dashd.initial_height); let (second_balance, _) = ctx.get_wallet_balance(&wallet_id); diff --git a/dash-spv-ffi/tests/dashd_sync/tests_transaction.rs b/dash-spv-ffi/tests/dashd_sync/tests_transaction.rs index 19adc5bb4..df897865c 100644 --- a/dash-spv-ffi/tests/dashd_sync/tests_transaction.rs +++ b/dash-spv-ffi/tests/dashd_sync/tests_transaction.rs @@ -26,7 +26,7 @@ fn test_ffi_sync_then_generate_blocks() { let ctx = FFITestContext::new(dashd.addr); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_all_callbacks(); + ctx.run(); ctx.wait_for_sync(dashd.initial_height); assert_eq!( @@ -145,7 +145,7 @@ fn test_ffi_multiple_transactions_in_single_block() { let ctx = FFITestContext::new(dashd.addr); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_all_callbacks(); + ctx.run(); ctx.wait_for_sync(dashd.initial_height); let baseline_tx_count = ctx.transaction_count(&wallet_id); @@ -229,7 +229,7 @@ fn test_ffi_multiple_transactions_across_blocks() { let ctx = FFITestContext::new(dashd.addr); let wallet_id = ctx.add_wallet(&dashd.wallet.mnemonic); - ctx.run_with_all_callbacks(); + ctx.run(); ctx.wait_for_sync(dashd.initial_height); let baseline_tx_count = ctx.transaction_count(&wallet_id); diff --git a/dash-spv-ffi/tests/test_client.rs b/dash-spv-ffi/tests/test_client.rs index b1001dffe..af4387a7f 100644 --- a/dash-spv-ffi/tests/test_client.rs +++ b/dash-spv-ffi/tests/test_client.rs @@ -4,24 +4,8 @@ mod tests { use key_wallet_ffi::FFINetwork; use serial_test::serial; use std::ffi::CString; - use std::os::raw::c_void; - use std::sync::Mutex; use tempfile::TempDir; - struct ProgressCallbackData { - state: Mutex>, - is_synced: Mutex>, - } - - impl ProgressCallbackData { - fn new() -> Self { - Self { - state: Mutex::new(None), - is_synced: Mutex::new(None), - } - } - } - fn create_test_config() -> (*mut FFIClientConfig, TempDir) { let temp_dir = TempDir::new().unwrap(); let config = dash_spv_ffi_config_new(FFINetwork::Regtest); @@ -40,7 +24,7 @@ mod tests { unsafe { let (config, _temp_dir) = create_test_config(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); dash_spv_ffi_client_destroy(client); @@ -52,7 +36,7 @@ mod tests { #[serial] fn test_client_null_config() { unsafe { - let client = dash_spv_ffi_client_new(std::ptr::null()); + let client = dash_spv_ffi_client_new(std::ptr::null(), FFIEventCallbacks::default()); assert!(client.is_null()); } } @@ -62,9 +46,9 @@ mod tests { fn test_client_lifecycle() { unsafe { let (config, _temp_dir) = create_test_config(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); - // Note: Start/stop may fail in test environment without network + // Pass default (no-op) callbacks — start/stop may fail without network let _result = dash_spv_ffi_client_run(client); let _result = dash_spv_ffi_client_stop(client); @@ -87,48 +71,4 @@ mod tests { assert!(progress.is_null()); } } - - extern "C" fn test_progress_callback(progress: *const FFISyncProgress, user_data: *mut c_void) { - assert!(!progress.is_null()); - let data = unsafe { &*(user_data as *const ProgressCallbackData) }; - let p = unsafe { &*progress }; - - *data.state.lock().unwrap() = Some(p.state); - *data.is_synced.lock().unwrap() = Some(p.is_synced); - } - - #[test] - #[serial] - fn test_set_progress_callback_emits_progress() { - unsafe { - let (config, _temp_dir) = create_test_config(); - let client = dash_spv_ffi_client_new(config); - assert!(!client.is_null()); - - let callback_data = Box::new(ProgressCallbackData::new()); - let data_ptr = &*callback_data as *const ProgressCallbackData as *mut c_void; - - let progress_callback = FFIProgressCallback { - on_progress: Some(test_progress_callback), - user_data: data_ptr, - }; - - let result = dash_spv_ffi_client_set_progress_callback(client, progress_callback); - assert_eq!(result, FFIErrorCode::Success as i32); - - // Verify callback was invoked with expected initial values - assert_eq!( - callback_data.state.lock().unwrap().unwrap(), - FFISyncState::WaitingForConnections, - "initial state should be WaitingForConnections" - ); - assert!( - !callback_data.is_synced.lock().unwrap().unwrap(), - "initial is_synced should be false" - ); - - dash_spv_ffi_client_destroy(client); - dash_spv_ffi_config_destroy(config); - } - } } diff --git a/dash-spv-ffi/tests/test_wallet_manager.rs b/dash-spv-ffi/tests/test_wallet_manager.rs index 1edce9598..15f5df2fd 100644 --- a/dash-spv-ffi/tests/test_wallet_manager.rs +++ b/dash-spv-ffi/tests/test_wallet_manager.rs @@ -28,7 +28,7 @@ mod tests { ); // Create a client - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Get wallet manager @@ -64,7 +64,7 @@ mod tests { CString::new(temp_dir.path().to_str().unwrap()).unwrap().as_ptr(), ); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); let wallet_manager = dash_spv_ffi_client_get_wallet_manager(client); diff --git a/dash-spv-ffi/tests/unit/test_async_operations.rs b/dash-spv-ffi/tests/unit/test_async_operations.rs index cefeb27ed..c18726178 100644 --- a/dash-spv-ffi/tests/unit/test_async_operations.rs +++ b/dash-spv-ffi/tests/unit/test_async_operations.rs @@ -20,7 +20,7 @@ mod tests { let path = CString::new(temp_dir.path().to_str().unwrap()).unwrap(); dash_spv_ffi_config_set_data_dir(config, path.as_ptr()); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null(), "Failed to create client"); (client, config, temp_dir) @@ -113,7 +113,7 @@ mod tests { println!("Testing callback thread safety with concurrent invocations"); - // Start the client + // Start the client with default (empty) callbacks let start_result = dash_spv_ffi_client_run(client); assert_eq!(start_result, 0); thread::sleep(Duration::from_millis(100)); @@ -292,8 +292,17 @@ mod tests { user_data: &event_data as *const _ as *mut c_void, }; - let result = dash_spv_ffi_client_set_sync_event_callbacks(client, sync_callbacks); - assert_eq!(result, FFIErrorCode::Success as i32); + // Build an FFIEventCallbacks with sync callbacks set + let callbacks = FFIEventCallbacks { + sync: sync_callbacks, + ..FFIEventCallbacks::default() + }; + + // Verify the struct is properly constructed (callbacks are now + // passed directly to run(), no separate set call needed) + assert!(callbacks.sync.on_sync_start.is_some()); + assert!(callbacks.sync.on_block_headers_stored.is_some()); + assert!(callbacks.sync.on_sync_complete.is_some()); dash_spv_ffi_client_destroy(client); dash_spv_ffi_config_destroy(config); diff --git a/dash-spv-ffi/tests/unit/test_client_lifecycle.rs b/dash-spv-ffi/tests/unit/test_client_lifecycle.rs index ceef717ec..e0f642b9e 100644 --- a/dash-spv-ffi/tests/unit/test_client_lifecycle.rs +++ b/dash-spv-ffi/tests/unit/test_client_lifecycle.rs @@ -30,7 +30,7 @@ mod tests { fn test_client_creation_with_invalid_config() { unsafe { // Test with null config - let client = dash_spv_ffi_client_new(std::ptr::null()); + let client = dash_spv_ffi_client_new(std::ptr::null(), FFIEventCallbacks::default()); assert!(client.is_null()); // Check error was set @@ -49,7 +49,7 @@ mod tests { // Create multiple clients with different data directories for i in 0..3 { let (config, temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null(), "Failed to create client {}", i); clients.push(client); @@ -70,7 +70,7 @@ mod tests { fn test_client_start_stop_restart() { unsafe { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Start @@ -95,7 +95,7 @@ mod tests { fn test_client_destruction_while_operations_pending() { unsafe { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Start a sync operation in background @@ -119,7 +119,7 @@ mod tests { dash_spv_ffi_config_set_data_dir(config, path.as_ptr()); // Don't add any peers - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Try to start (should handle no peers gracefully) @@ -139,7 +139,7 @@ mod tests { unsafe { for _ in 0..5 { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Do some operations @@ -188,7 +188,7 @@ mod tests { fn test_client_state_consistency() { unsafe { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Get initial state @@ -228,15 +228,15 @@ mod tests { unsafe { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); - assert!(!client.is_null()); - - let callback = FFIClientErrorCallback { - on_error: Some(on_error), - user_data: tx_ptr as *mut std::os::raw::c_void, + let callbacks = FFIEventCallbacks { + error: FFIClientErrorCallback { + on_error: Some(on_error), + user_data: tx_ptr as *mut std::os::raw::c_void, + }, + ..FFIEventCallbacks::default() }; - let result = dash_spv_ffi_client_set_client_error_callback(client, callback); - assert_eq!(result, FFIErrorCode::Success as i32); + let client = dash_spv_ffi_client_new(config, callbacks); + assert!(!client.is_null()); // Call run() twice — the second run's sync thread will call // start() on the already-running client, triggering "already running" @@ -302,20 +302,10 @@ mod tests { #[test] #[serial] - fn test_client_error_callback_null_client() { + fn test_client_run_null_client() { unsafe { - let callback = FFIClientErrorCallback { - on_error: None, - user_data: std::ptr::null_mut(), - }; - assert_eq!( - dash_spv_ffi_client_set_client_error_callback(std::ptr::null_mut(), callback), - FFIErrorCode::NullPointer as i32 - ); - - assert_eq!( - dash_spv_ffi_client_clear_client_error_callback(std::ptr::null_mut()), + dash_spv_ffi_client_run(std::ptr::null_mut()), FFIErrorCode::NullPointer as i32 ); } @@ -336,7 +326,7 @@ mod tests { for _ in 0..10 { unsafe { let (config, _temp_dir) = create_test_config_with_dir(); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Do a quick operation diff --git a/dash-spv-ffi/tests/unit/test_memory_management.rs b/dash-spv-ffi/tests/unit/test_memory_management.rs index dba427a9c..a3722df87 100644 --- a/dash-spv-ffi/tests/unit/test_memory_management.rs +++ b/dash-spv-ffi/tests/unit/test_memory_management.rs @@ -50,7 +50,7 @@ mod tests { // Create and destroy multiple clients for _ in 0..10 { - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Perform some operations @@ -180,7 +180,7 @@ mod tests { let path = CString::new(temp_dir.path().to_str().unwrap()).unwrap(); dash_spv_ffi_config_set_data_dir(config, path.as_ptr()); - let client = dash_spv_ffi_client_new(config); + let client = dash_spv_ffi_client_new(config, FFIEventCallbacks::default()); assert!(!client.is_null()); // Get structures that contain FFIString and other pointers diff --git a/dash-spv/examples/filter_sync.rs b/dash-spv/examples/filter_sync.rs index 955400f30..07e0840e9 100644 --- a/dash-spv/examples/filter_sync.rs +++ b/dash-spv/examples/filter_sync.rs @@ -36,7 +36,8 @@ async fn main() -> Result<(), Box> { let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); // Create the client - let client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?; + let client = + DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())).await?; println!("Starting synchronization with filter support..."); println!("Watching address: {:?}", watch_address); diff --git a/dash-spv/examples/simple_sync.rs b/dash-spv/examples/simple_sync.rs index 8fd1cbbbc..33344a824 100644 --- a/dash-spv/examples/simple_sync.rs +++ b/dash-spv/examples/simple_sync.rs @@ -31,7 +31,8 @@ async fn main() -> Result<(), Box> { let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); // Create the client - let client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?; + let client = + DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())).await?; println!("Starting header synchronization..."); diff --git a/dash-spv/examples/spv_with_wallet.rs b/dash-spv/examples/spv_with_wallet.rs index 31e6d4ffe..5c21534a3 100644 --- a/dash-spv/examples/spv_with_wallet.rs +++ b/dash-spv/examples/spv_with_wallet.rs @@ -31,7 +31,8 @@ async fn main() -> Result<(), Box> { let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); // Create the SPV client with all components - let client = DashSpvClient::new(config, network_manager, storage_manager, wallet).await?; + let client = + DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())).await?; // The wallet will automatically be notified of: // - New blocks via process_block() diff --git a/dash-spv/src/client/core.rs b/dash-spv/src/client/core.rs index fa9ebda92..746ffd5f1 100644 --- a/dash-spv/src/client/core.rs +++ b/dash-spv/src/client/core.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; use super::ClientConfig; +use crate::client::EventHandler; use crate::error::{Result, SpvError}; use crate::mempool_filter::MempoolFilter; use crate::network::NetworkManager; @@ -74,6 +75,7 @@ pub(super) type PersistentSyncCoordinator = SyncCoordinator< /// - `W: WalletInterface` - Handles UTXO tracking, address management, transaction processing /// - `N: NetworkManager` - Manages peer connections, message routing, network protocol /// - `S: StorageManager` - Persistent storage for headers, filters, chain state +/// - `H: EventHandler` - Receives push-based event notifications (defaults to `()` no-op) /// /// ## Common Configurations /// @@ -104,7 +106,12 @@ pub(super) type PersistentSyncCoordinator = SyncCoordinator< /// - Not reduce binary size (production has one instantiation anyway) /// /// The generic design is an intentional, beneficial architectural choice for a library. -pub struct DashSpvClient { +pub struct DashSpvClient< + W: WalletInterface, + N: NetworkManager, + S: StorageManager, + H: EventHandler = (), +> { pub(super) config: Arc>, pub(super) network: Arc>, pub(super) storage: Arc>, @@ -115,9 +122,12 @@ pub struct DashSpvClient>, pub(super) mempool_state: Arc>, pub(super) mempool_filter: Arc>>>, + pub(super) event_handler: Arc, } -impl Clone for DashSpvClient { +impl Clone + for DashSpvClient +{ fn clone(&self) -> Self { Self { config: Arc::clone(&self.config), @@ -129,11 +139,14 @@ impl Clone for DashSpv running: Arc::clone(&self.running), mempool_state: Arc::clone(&self.mempool_state), mempool_filter: Arc::clone(&self.mempool_filter), + event_handler: Arc::clone(&self.event_handler), } } } -impl DashSpvClient { +impl + DashSpvClient +{ // ============ Simple Getters ============ /// Get a reference to the wallet. diff --git a/dash-spv/src/client/event_handler.rs b/dash-spv/src/client/event_handler.rs new file mode 100644 index 000000000..b3c3026df --- /dev/null +++ b/dash-spv/src/client/event_handler.rs @@ -0,0 +1,352 @@ +//! Event handler trait for receiving SPV client events. +//! +//! Provides `EventHandler`, a trait with default no-op implementations that +//! consumers override to receive push-based event notifications. The monitoring +//! infrastructure subscribes to internal channels and dispatches to the handler. + +use std::sync::Arc; + +use tokio::sync::{broadcast, watch}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::network::NetworkEvent; +use crate::sync::{SyncEvent, SyncProgress}; +use key_wallet::manager::WalletEvent; + +/// Trait for receiving SPV client events. +/// +/// All methods have default no-op implementations, so consumers only +/// need to override the events they care about. +pub trait EventHandler: Send + Sync + 'static { + /// Called for sync lifecycle events (headers stored, sync complete, etc.). + fn on_sync_event(&self, _event: &SyncEvent) {} + /// Called when peer connections change (connect, disconnect, peer list update). + fn on_network_event(&self, _event: &NetworkEvent) {} + /// Called when overall sync progress changes. + fn on_progress(&self, _progress: &SyncProgress) {} + /// Called for wallet events (transaction received, balance updated). + fn on_wallet_event(&self, _event: &WalletEvent) {} + /// Called on fatal errors (start failure, monitor channel failure, sync loop error). + fn on_error(&self, _error: &str) {} +} + +/// No-op implementation for consumers that don't need event notifications. +impl EventHandler for () {} + +/// Spawns a task that monitors a broadcast channel and dispatches events to the handler. +pub(crate) fn spawn_broadcast_monitor( + name: &'static str, + mut receiver: broadcast::Receiver, + handler: Arc, + shutdown: CancellationToken, + dispatch_fn: F, +) -> JoinHandle<()> +where + E: Clone + Send + 'static, + H: EventHandler, + F: Fn(&H, &E) + Send + 'static, +{ + tokio::spawn(async move { + tracing::debug!("{} monitoring task started", name); + loop { + tokio::select! { + result = receiver.recv() => { + match result { + Ok(event) => dispatch_fn(&handler, &event), + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + _ = shutdown.cancelled() => break, + } + } + tracing::debug!("{} monitoring task exiting", name); + }) +} + +/// Spawns a task that monitors a watch channel for progress updates. +/// +/// Sends the initial progress value, then monitors for changes. +pub(crate) fn spawn_progress_monitor( + mut receiver: watch::Receiver, + handler: Arc, + shutdown: CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + tracing::debug!("Progress monitoring task started"); + + handler.on_progress(&receiver.borrow_and_update()); + + loop { + tokio::select! { + result = receiver.changed() => { + match result { + Ok(()) => handler.on_progress(&receiver.borrow_and_update()), + Err(_) => break, + } + } + _ = shutdown.cancelled() => break, + } + } + tracing::debug!("Progress monitoring task exiting"); + }) +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use tokio::sync::{broadcast, watch}; + use tokio_util::sync::CancellationToken; + + use super::{spawn_broadcast_monitor, spawn_progress_monitor, EventHandler}; + use crate::network::NetworkEvent; + use crate::sync::{ManagerIdentifier, SyncEvent, SyncProgress}; + use key_wallet::manager::WalletEvent; + + struct RecordingHandler { + sync_count: AtomicUsize, + network_count: AtomicUsize, + progress_count: AtomicUsize, + wallet_count: AtomicUsize, + error_count: AtomicUsize, + } + + impl RecordingHandler { + fn new() -> Self { + Self { + sync_count: AtomicUsize::new(0), + network_count: AtomicUsize::new(0), + progress_count: AtomicUsize::new(0), + wallet_count: AtomicUsize::new(0), + error_count: AtomicUsize::new(0), + } + } + } + + impl EventHandler for RecordingHandler { + fn on_sync_event(&self, _event: &SyncEvent) { + self.sync_count.fetch_add(1, Ordering::SeqCst); + } + fn on_network_event(&self, _event: &NetworkEvent) { + self.network_count.fetch_add(1, Ordering::SeqCst); + } + fn on_progress(&self, _progress: &SyncProgress) { + self.progress_count.fetch_add(1, Ordering::SeqCst); + } + fn on_wallet_event(&self, _event: &WalletEvent) { + self.wallet_count.fetch_add(1, Ordering::SeqCst); + } + fn on_error(&self, _error: &str) { + self.error_count.fetch_add(1, Ordering::SeqCst); + } + } + + #[tokio::test] + async fn noop_handler_does_not_panic() { + let handler: () = (); + let event = SyncEvent::BlockHeadersStored { + tip_height: 100, + }; + handler.on_sync_event(&event); + handler.on_network_event(&NetworkEvent::PeersUpdated { + connected_count: 0, + addresses: vec![], + best_height: None, + }); + handler.on_progress(&SyncProgress::default()); + handler.on_error("test error"); + } + + #[tokio::test] + async fn broadcast_monitor_dispatches_events() { + let (tx, rx) = broadcast::channel(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 1, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 2, + }) + .unwrap(); + tx.send(SyncEvent::SyncStart { + identifier: ManagerIdentifier::BlockHeader, + }) + .unwrap(); + + // Give the task time to process + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.sync_count.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn broadcast_monitor_exits_on_shutdown() { + let (_tx, rx) = broadcast::channel::(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.sync_count.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn broadcast_monitor_exits_on_channel_close() { + let (tx, rx) = broadcast::channel::(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + drop(tx); + task.await.unwrap(); + } + + #[tokio::test] + async fn broadcast_monitor_handles_lagged_receiver() { + let (tx, rx) = broadcast::channel(2); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + // Send more messages than the buffer can hold before spawning the monitor + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 1, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 2, + }) + .unwrap(); + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 3, + }) + .unwrap(); + + let task = spawn_broadcast_monitor( + "test", + rx, + handler.clone(), + shutdown.clone(), + |h: &RecordingHandler, event: &SyncEvent| h.on_sync_event(event), + ); + + // Send one more after the monitor starts + tx.send(SyncEvent::BlockHeadersStored { + tip_height: 4, + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + // The monitor should have received at least the last message (and possibly + // one from the lagged recovery). The key thing is it doesn't crash. + assert!(handler.sync_count.load(Ordering::SeqCst) >= 1); + } + + #[tokio::test] + async fn progress_monitor_sends_initial_and_updates() { + let (tx, rx) = watch::channel(SyncProgress::default()); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_progress_monitor(rx, handler.clone(), shutdown.clone()); + + // Give the task time to send initial progress + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Send two updates + tx.send_modify(|_| {}); + tx.send_modify(|_| {}); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + // 1 initial + at least 1 update (watch coalesces rapid updates) + assert!(handler.progress_count.load(Ordering::SeqCst) >= 2); + } + + #[tokio::test] + async fn progress_monitor_exits_on_sender_drop() { + let (tx, rx) = watch::channel(SyncProgress::default()); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_progress_monitor(rx, handler.clone(), shutdown.clone()); + + // Give it time to send initial + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + drop(tx); + task.await.unwrap(); + + // At least the initial progress was sent + assert!(handler.progress_count.load(Ordering::SeqCst) >= 1); + } + + #[tokio::test] + async fn network_event_dispatch() { + let (tx, rx) = broadcast::channel(16); + let handler = Arc::new(RecordingHandler::new()); + let shutdown = CancellationToken::new(); + + let task = spawn_broadcast_monitor( + "network", + rx, + handler.clone(), + shutdown.clone(), + |h: &RecordingHandler, event: &NetworkEvent| h.on_network_event(event), + ); + + let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap(); + tx.send(NetworkEvent::PeerConnected { + address: addr, + }) + .unwrap(); + tx.send(NetworkEvent::PeerDisconnected { + address: addr, + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + shutdown.cancel(); + task.await.unwrap(); + + assert_eq!(handler.network_count.load(Ordering::SeqCst), 2); + } +} diff --git a/dash-spv/src/client/events.rs b/dash-spv/src/client/events.rs index 7f5388afd..073d3dfef 100644 --- a/dash-spv/src/client/events.rs +++ b/dash-spv/src/client/events.rs @@ -12,11 +12,13 @@ use crate::sync::{SyncEvent, SyncProgress}; use key_wallet::manager::WalletInterface; use tokio::sync::broadcast; -use super::DashSpvClient; +use super::{DashSpvClient, EventHandler}; -impl DashSpvClient { +impl + DashSpvClient +{ /// Subscribe to sync progress updates via watch channel. - pub async fn subscribe_progress(&self) -> watch::Receiver { + pub(crate) async fn subscribe_progress(&self) -> watch::Receiver { self.sync_coordinator.lock().await.subscribe_progress() } @@ -26,12 +28,12 @@ impl DashSpvClient broadcast::Receiver { + pub(crate) async fn subscribe_sync_events(&self) -> broadcast::Receiver { self.sync_coordinator.lock().await.subscribe_events() } /// Subscribe to network events. - pub async fn subscribe_network_events(&self) -> broadcast::Receiver { + pub(crate) async fn subscribe_network_events(&self) -> broadcast::Receiver { self.network.lock().await.subscribe_network_events() } } diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index dceea1caf..f5aac7279 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -12,7 +12,7 @@ use std::collections::HashSet; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; -use super::{ClientConfig, DashSpvClient}; +use super::{ClientConfig, DashSpvClient, EventHandler}; use crate::chain::checkpoints::{mainnet_checkpoints, testnet_checkpoints, CheckpointManager}; use crate::error::{Result, SpvError}; use crate::mempool_filter::MempoolFilter; @@ -30,13 +30,16 @@ use dashcore::sml::masternode_list_engine::MasternodeListEngine; use dashcore_hashes::Hash; use key_wallet::manager::WalletInterface; -impl DashSpvClient { +impl + DashSpvClient +{ /// Create a new SPV client with the given configuration, network, storage, and wallet. pub async fn new( config: ClientConfig, network: N, mut storage: S, wallet: Arc>, + event_handler: Arc, ) -> Result { // Validate configuration config.validate().map_err(SpvError::Config)?; @@ -140,11 +143,16 @@ impl DashSpvClient DashSpvClient { +impl + DashSpvClient +{ /// Get mempool balance for an address. pub async fn get_mempool_balance( &self, diff --git a/dash-spv/src/client/mod.rs b/dash-spv/src/client/mod.rs index f9cb8848e..53d857023 100644 --- a/dash-spv/src/client/mod.rs +++ b/dash-spv/src/client/mod.rs @@ -25,6 +25,7 @@ //! Never acquire locks in reverse order or deadlock will occur! pub mod config; +pub mod event_handler; mod core; mod events; @@ -36,6 +37,7 @@ mod transactions; // Re-export public types from extracted modules pub use config::ClientConfig; +pub use event_handler::EventHandler; // Re-export the main client struct pub use core::DashSpvClient; @@ -75,7 +77,7 @@ mod tests { DiskStorageManager::with_temp_dir().await.expect("Failed to create tmp storage"); let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); - let client = DashSpvClient::new(config, network_manager, storage, wallet) + let client = DashSpvClient::new(config, network_manager, storage, wallet, Arc::new(())) .await .expect("client construction must succeed"); @@ -102,7 +104,7 @@ mod tests { let test_address = Address::dummy(config.network, 0); - let client = DashSpvClient::new(config, network_manager, storage, wallet) + let client = DashSpvClient::new(config, network_manager, storage, wallet, Arc::new(())) .await .expect("client construction must succeed"); diff --git a/dash-spv/src/client/queries.rs b/dash-spv/src/client/queries.rs index 90b72ad9a..7f8749260 100644 --- a/dash-spv/src/client/queries.rs +++ b/dash-spv/src/client/queries.rs @@ -17,9 +17,11 @@ use key_wallet::manager::WalletInterface; use std::sync::Arc; use tokio::sync::RwLock; -use super::DashSpvClient; +use super::{DashSpvClient, EventHandler}; -impl DashSpvClient { +impl + DashSpvClient +{ // ============ Peer Queries ============ /// Get the number of connected peers. diff --git a/dash-spv/src/client/sync_coordinator.rs b/dash-spv/src/client/sync_coordinator.rs index 3709a1387..8a99fb840 100644 --- a/dash-spv/src/client/sync_coordinator.rs +++ b/dash-spv/src/client/sync_coordinator.rs @@ -1,38 +1,83 @@ //! Sync coordination and orchestration. -use super::DashSpvClient; +use std::time::Duration; + +use tokio_util::sync::CancellationToken; + +use super::event_handler::{spawn_broadcast_monitor, spawn_progress_monitor}; +use super::{DashSpvClient, EventHandler}; use crate::error::Result; use crate::network::NetworkManager; use crate::storage::StorageManager; use crate::sync::SyncProgress; +use crate::SpvError; use key_wallet::manager::WalletInterface; -use std::time::Duration; -use tokio_util::sync::CancellationToken; const SYNC_COORDINATOR_TICK_MS: Duration = Duration::from_millis(100); -impl DashSpvClient { +impl + DashSpvClient +{ /// Get current sync progress. pub async fn sync_progress(&self) -> SyncProgress { self.sync_coordinator.lock().await.progress().clone() } - /// Start the client and run the monitoring loop until the token is cancelled. + /// Start the client and run the sync loop until the token is cancelled. /// - /// Calls `start()` internally, runs continuous network monitoring for new - /// blocks, ChainLocks, InstantLocks, etc., and calls `stop()` before returning. - /// The caller is responsible for cancelling the token (e.g. on ctrl-c). + /// Subscribes to all event channels internally and dispatches events to the + /// event handler provided at construction. Calls `start()` internally, runs + /// continuous network monitoring, and calls `stop()` before returning. pub async fn run(&self, token: CancellationToken) -> Result<()> { - self.start().await?; + let handler = self.event_handler.clone(); + + if let Err(e) = self.start().await { + handler.on_error(&e.to_string()); + return Err(e); + } tracing::info!("Starting continuous network monitoring..."); + let monitor_shutdown = CancellationToken::new(); + + // Subscribe to channels + let sync_event_rx = self.subscribe_sync_events().await; + let network_event_rx = self.subscribe_network_events().await; + let progress_rx = self.subscribe_progress().await; + let wallet_event_rx = self.wallet.read().await.subscribe_events(); + + // Spawn monitoring tasks + let sync_task = spawn_broadcast_monitor( + "Sync event", + sync_event_rx, + handler.clone(), + monitor_shutdown.clone(), + |h, event| h.on_sync_event(event), + ); + + let network_task = spawn_broadcast_monitor( + "Network event", + network_event_rx, + handler.clone(), + monitor_shutdown.clone(), + |h, event| h.on_network_event(event), + ); + + let wallet_task = spawn_broadcast_monitor( + "Wallet event", + wallet_event_rx, + handler.clone(), + monitor_shutdown.clone(), + |h, event| h.on_wallet_event(event), + ); + + let progress_task = + spawn_progress_monitor(progress_rx, handler.clone(), monitor_shutdown.clone()); + + // Run the sync loop let mut sync_coordinator_tick_interval = tokio::time::interval(SYNC_COORDINATOR_TICK_MS); - let mut progress_updates = self.sync_coordinator.lock().await.subscribe_progress(); - let mut wallet_events = self.wallet.read().await.subscribe_events(); - let error = loop { - // Check if we should stop + let error: Option = loop { let running = self.running.read().await; if !*running { tracing::info!("Stopping network monitoring"); @@ -40,31 +85,7 @@ impl DashSpvClient { - match result { - Ok(()) => { - tracing::info!("Sync progress: {}", *progress_updates.borrow()); - None - } - Err(_) => { - tracing::warn!("Progress channel closed."); - break None - } - } - } - result = wallet_events.recv() => { - match result { - Ok(event) => { - tracing::info!("Wallet event: {}", event.description()); - None - } - Err(e) => { - tracing::warn!("Wallet events channel error: {e}"); - break None - } - } - } + let error: Option = tokio::select! { _ = sync_coordinator_tick_interval.tick() => { self.sync_coordinator.lock().await.tick().await.err().map(Into::into) } @@ -74,12 +95,16 @@ impl DashSpvClient DashSpvClient { +impl + DashSpvClient +{ /// Broadcast a transaction to all connected peers. pub async fn broadcast_transaction(&self, tx: &dashcore::Transaction) -> Result<()> { let network_guard = self.network.lock().await; diff --git a/dash-spv/src/lib.rs b/dash-spv/src/lib.rs index 08351dc85..5fb3d392a 100644 --- a/dash-spv/src/lib.rs +++ b/dash-spv/src/lib.rs @@ -34,7 +34,7 @@ //! let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); //! //! // Create and run the client -//! let client = DashSpvClient::new(config.clone(), network, storage, wallet).await?; +//! let client = DashSpvClient::new(config.clone(), network, storage, wallet, Arc::new(())).await?; //! let shutdown_token = CancellationToken::new(); //! //! client.run(shutdown_token).await?; @@ -69,7 +69,7 @@ pub mod validation; // Re-export main types for convenience pub use client::config::MempoolStrategy; -pub use client::{ClientConfig, DashSpvClient}; +pub use client::{ClientConfig, DashSpvClient, EventHandler}; pub use error::{ LoggingError, LoggingResult, NetworkError, SpvError, StorageError, SyncError, ValidationError, }; diff --git a/dash-spv/src/main.rs b/dash-spv/src/main.rs index 6e92e87d0..ec01c3002 100644 --- a/dash-spv/src/main.rs +++ b/dash-spv/src/main.rs @@ -6,11 +6,38 @@ use std::process; use std::sync::Arc; use clap::{Arg, Command}; -use dash_spv::{ClientConfig, DashSpvClient, LevelFilter, MempoolStrategy, Network}; -use key_wallet::manager::WalletManager; +use dash_spv::network::NetworkEvent; +use dash_spv::sync::{SyncEvent, SyncProgress}; +use dash_spv::{ClientConfig, DashSpvClient, EventHandler, LevelFilter, MempoolStrategy, Network}; +use key_wallet::manager::{WalletEvent, WalletManager}; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use tokio_util::sync::CancellationToken; +/// Logs all SPV client events via tracing. +struct LoggingEventHandler; + +impl EventHandler for LoggingEventHandler { + fn on_sync_event(&self, event: &SyncEvent) { + tracing::info!("{}", event.description()); + } + + fn on_network_event(&self, event: &NetworkEvent) { + tracing::info!("{}", event.description()); + } + + fn on_progress(&self, progress: &SyncProgress) { + tracing::info!("Sync progress: {}", progress); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + tracing::info!("Wallet: {}", event.description()); + } + + fn on_error(&self, error: &str) { + tracing::error!("{}", error); + } +} + #[tokio::main] async fn main() { if let Err(e) = run().await { @@ -327,7 +354,14 @@ async fn run_client( WalletManager, dash_spv::network::manager::PeerNetworkManager, S, - >::new(config.clone(), network_manager, storage_manager, wallet.clone()) + LoggingEventHandler, + >::new( + config.clone(), + network_manager, + storage_manager, + wallet.clone(), + Arc::new(LoggingEventHandler), + ) .await { Ok(client) => client, diff --git a/dash-spv/src/test_utils/event_handler.rs b/dash-spv/src/test_utils/event_handler.rs new file mode 100644 index 000000000..7d7f21f98 --- /dev/null +++ b/dash-spv/src/test_utils/event_handler.rs @@ -0,0 +1,81 @@ +//! Test event handler that bridges `EventHandler` back to tokio channels. +//! +//! Allows integration tests to use `run()` with an `EventHandler` while keeping +//! the ergonomic `tokio::select!` patterns that channels provide. + +use tokio::sync::{broadcast, watch}; + +use crate::client::EventHandler; +use crate::network::NetworkEvent; +use crate::sync::{SyncEvent, SyncProgress}; +use key_wallet::manager::WalletEvent; + +/// Event handler that forwards all events to internal channels. +/// +/// Tests create this handler, take receivers via the accessor methods, +/// then pass `Arc` to `run()`. +pub struct TestEventHandler { + sync_tx: broadcast::Sender, + network_tx: broadcast::Sender, + progress_tx: watch::Sender, + wallet_tx: broadcast::Sender, +} + +impl TestEventHandler { + pub fn new() -> Self { + let (sync_tx, _) = broadcast::channel(256); + let (network_tx, _) = broadcast::channel(256); + let (progress_tx, _) = watch::channel(SyncProgress::default()); + let (wallet_tx, _) = broadcast::channel(256); + Self { + sync_tx, + network_tx, + progress_tx, + wallet_tx, + } + } + + pub fn subscribe_sync_events(&self) -> broadcast::Receiver { + self.sync_tx.subscribe() + } + + pub fn subscribe_network_events(&self) -> broadcast::Receiver { + self.network_tx.subscribe() + } + + pub fn subscribe_progress(&self) -> watch::Receiver { + self.progress_tx.subscribe() + } + + pub fn subscribe_wallet_events(&self) -> broadcast::Receiver { + self.wallet_tx.subscribe() + } +} + +impl Default for TestEventHandler { + fn default() -> Self { + Self::new() + } +} + +impl EventHandler for TestEventHandler { + fn on_sync_event(&self, event: &SyncEvent) { + let _ = self.sync_tx.send(event.clone()); + } + + fn on_network_event(&self, event: &NetworkEvent) { + let _ = self.network_tx.send(event.clone()); + } + + fn on_progress(&self, progress: &SyncProgress) { + self.progress_tx.send_replace(progress.clone()); + } + + fn on_wallet_event(&self, event: &WalletEvent) { + let _ = self.wallet_tx.send(event.clone()); + } + + fn on_error(&self, error: &str) { + tracing::error!("TestEventHandler received error: {}", error); + } +} diff --git a/dash-spv/src/test_utils/mod.rs b/dash-spv/src/test_utils/mod.rs index 83c8f7e96..1ae7b8a89 100644 --- a/dash-spv/src/test_utils/mod.rs +++ b/dash-spv/src/test_utils/mod.rs @@ -2,6 +2,7 @@ mod chain_tip; mod chain_work; mod checkpoint; mod context; +mod event_handler; mod filter; mod fs_helpers; mod network; @@ -14,6 +15,7 @@ use std::time::Duration; pub const SYNC_TIMEOUT: Duration = Duration::from_secs(180); pub use context::DashdTestContext; +pub use event_handler::TestEventHandler; pub use fs_helpers::retain_test_dir; pub use network::{test_socket_address, MockNetworkManager}; pub use node::{DashCoreNode, TestChain, WalletFile}; diff --git a/dash-spv/tests/dashd_sync/setup.rs b/dash-spv/tests/dashd_sync/setup.rs index d8caa7c47..80a4ad16e 100644 --- a/dash-spv/tests/dashd_sync/setup.rs +++ b/dash-spv/tests/dashd_sync/setup.rs @@ -1,6 +1,6 @@ use dash_spv::network::NetworkEvent; use dash_spv::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; -use dash_spv::test_utils::{retain_test_dir, DashdTestContext, TestChain}; +use dash_spv::test_utils::{retain_test_dir, DashdTestContext, TestChain, TestEventHandler}; use dash_spv::{ client::{ClientConfig, DashSpvClient}, network::PeerNetworkManager, @@ -242,8 +242,12 @@ impl Drop for TestContext { } /// Type alias for the SPV client used in tests. -pub(super) type TestClient = - DashSpvClient, PeerNetworkManager, DiskStorageManager>; +pub(super) type TestClient = DashSpvClient< + WalletManager, + PeerNetworkManager, + DiskStorageManager, + TestEventHandler, +>; /// A `ClientHandle` is a utility structure that manages the state and handles for a `TestClient` /// required to interact with the synchronization process, various event channels, and cancellation capabilities. @@ -274,7 +278,10 @@ impl ClientHandle { } } -/// Creates a new SPV client and starts it. +/// Creates a new SPV client and starts it with a `TestEventHandler`. +/// +/// The handler bridges events back to channels so tests can use `tokio::select!` +/// patterns while going through the `EventHandler` trait. pub(super) async fn create_and_start_client( config: &ClientConfig, wallet: Arc>>, @@ -284,17 +291,20 @@ pub(super) async fn create_and_start_client( let storage_manager = DiskStorageManager::new(config).await.expect("Failed to create storage manager"); - let client = DashSpvClient::new(config.clone(), network_manager, storage_manager, wallet) - .await - .expect("Failed to create client"); + let handler = Arc::new(TestEventHandler::new()); + let progress_receiver = handler.subscribe_progress(); + let sync_event_receiver = handler.subscribe_sync_events(); + let network_event_receiver = handler.subscribe_network_events(); + + let client = + DashSpvClient::new(config.clone(), network_manager, storage_manager, wallet, handler) + .await + .expect("Failed to create client"); - let progress_receiver = client.subscribe_progress().await; - let sync_event_receiver = client.subscribe_sync_events().await; - let network_event_receiver = client.subscribe_network_events().await; let cancel_token = CancellationToken::new(); let run_token = cancel_token.clone(); - let run_client = client.clone(); + let run_handle = tokio::task::spawn(async move { run_client.run(run_token).await }); ClientHandle { diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index 4b70222e0..519dd5fe4 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -45,8 +45,9 @@ async fn test_peer_connection() { // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); - let client = - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap(); + let client = DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())) + .await + .unwrap(); let token = CancellationToken::new(); let cancel = token.clone(); @@ -82,9 +83,15 @@ async fn test_peer_persistence() { // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); - let client = DashSpvClient::new(config.clone(), network_manager, storage_manager, wallet) - .await - .unwrap(); + let client = DashSpvClient::new( + config.clone(), + network_manager, + storage_manager, + wallet, + Arc::new(()), + ) + .await + .unwrap(); let token = CancellationToken::new(); let cancel = token.clone(); @@ -112,7 +119,9 @@ async fn test_peer_persistence() { let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); let client = - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap(); + DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())) + .await + .unwrap(); // Should connect faster due to saved peers let token = CancellationToken::new(); @@ -153,8 +162,9 @@ async fn test_peer_disconnection() { // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); - let client = - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap(); + let client = DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())) + .await + .unwrap(); // Note: This test would require actual regtest nodes running // For now, we just test that the API works diff --git a/dash-spv/tests/wallet_integration_test.rs b/dash-spv/tests/wallet_integration_test.rs index 518ff78c2..4aaeb3be9 100644 --- a/dash-spv/tests/wallet_integration_test.rs +++ b/dash-spv/tests/wallet_integration_test.rs @@ -33,7 +33,9 @@ async fn create_test_client( // Create wallet manager let wallet = Arc::new(RwLock::new(WalletManager::::new(config.network))); - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap() + DashSpvClient::new(config, network_manager, storage_manager, wallet, Arc::new(())) + .await + .unwrap() } #[tokio::test]