From b04fed2e8a58dd6b4e001c64d03d85ad46cf7d3c Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Thu, 8 May 2025 20:30:07 +0100 Subject: [PATCH 1/2] feat(cbf): implement transaction broadcasting For the highest reliability, we wait for the connection requirement to be met by the node. Once met, we can broadcast and wait for confirmation. The function will either timeout after 15 seconds or successfully finish with gossip confirmation. --- src/commands.rs | 2 +- src/handlers.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/src/commands.rs b/src/commands.rs index 67f84537..52790bce 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -239,7 +239,7 @@ pub struct ProxyOpts { #[derive(Debug, Args, Clone, PartialEq, Eq)] pub struct CompactFilterOpts { /// Sets the number of parallel node connections. - #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))] + #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))] pub conn_count: u8, /// Optionally skip initial `skip_blocks` blocks. diff --git a/src/handlers.rs b/src/handlers.rs index 1ef05268..35c881a3 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -48,7 +48,11 @@ use std::str::FromStr; #[cfg(feature = "electrum")] use crate::utils::BlockchainClient::Electrum; +#[cfg(feature = "cbf")] +use bdk_kyoto::{Info, LightClient}; use bdk_wallet::bitcoin::base64::prelude::*; +#[cfg(feature = "cbf")] +use tokio::select; #[cfg(any( feature = "electrum", feature = "esplora", @@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand( (Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"), (None, None) => panic!("Missing `psbt` and `tx` option"), }; - let txid = match client { #[cfg(feature = "electrum")] Electrum { @@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand( .map_err(|e| Error::Generic(e.to_string()))?, #[cfg(feature = "cbf")] - KyotoClient { client: _ } => { - unimplemented!() + KyotoClient { client } => { + let LightClient { + requester, + mut log_subscriber, + mut info_subscriber, + mut warning_subscriber, + update_subscriber: _, + node, + } = client; + + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?; + + tokio::task::spawn(async move { node.run().await }); + tokio::task::spawn(async move { + select! { + log = log_subscriber.recv() => { + if let Some(log) = log { + tracing::info!("{log}"); + } + }, + warn = warning_subscriber.recv() => { + if let Some(warn) = warn { + tracing::warn!("{warn}"); + } + } + } + }); + let txid = tx.compute_txid(); + tracing::info!("Waiting for connections to broadcast..."); + while let Some(info) = info_subscriber.recv().await { + match info { + Info::ConnectionsMet => { + requester + .broadcast_random(tx.clone()) + .map_err(|e| Error::Generic(format!("{}", e)))?; + break; + } + _ => tracing::info!("{info}"), + } + } + tokio::time::timeout(tokio::time::Duration::from_secs(15), async move { + while let Some(info) = info_subscriber.recv().await { + match info { + Info::TxGossiped(wtxid) => { + tracing::info!("Succuessfully broadcast WTXID: {wtxid}"); + break; + } + Info::ConnectionsMet => { + tracing::info!("Rebroadcasting to new connections"); + requester.broadcast_random(tx.clone()).unwrap(); + } + _ => tracing::info!("{info}"), + } + } + }) + .await + .map_err(|_| { + tracing::warn!("Broadcast was unsuccessful"); + Error::Generic("Transaction broadcast timed out after 15 seconds".into()) + })?; + txid } }; Ok(json!({ "txid": txid })) @@ -857,7 +921,7 @@ async fn respond( subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { let blockchain = - new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?; + new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand) .await .map_err(|e| e.to_string())?; From 811f614515b6fb1ae5689bf02ec830bdc7ff62ac Mon Sep 17 00:00:00 2001 From: Steve Myers Date: Fri, 16 May 2025 11:15:49 -0500 Subject: [PATCH 2/2] fix(cbf): typo and cbf dir config when sqlite feature disabled --- src/error.rs | 9 ++++++++- src/handlers.rs | 19 +++++++++++-------- src/utils.rs | 19 ++++++++----------- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/error.rs b/src/error.rs index 27606d73..ad7d383c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use bdk_wallet::bitcoin::hex::HexToBytesError; +use bdk_wallet::bitcoin::psbt::ExtractTxError; use bdk_wallet::bitcoin::{base64, consensus}; use thiserror::Error; @@ -51,7 +52,7 @@ pub enum BDKCliError { ParseOutPointError(#[from] bdk_wallet::bitcoin::blockdata::transaction::ParseOutPointError), #[error("PsbtExtractTxError: {0}")] - PsbtExtractTxError(#[from] bdk_wallet::bitcoin::psbt::ExtractTxError), + PsbtExtractTxError(Box), #[error("PsbtError: {0}")] PsbtError(#[from] bdk_wallet::bitcoin::psbt::Error), @@ -90,3 +91,9 @@ pub enum BDKCliError { #[error("BDK-Kyoto error: {0}")] BuilderError(#[from] bdk_kyoto::builder::BuilderError), } + +impl From for BDKCliError { + fn from(value: ExtractTxError) -> Self { + BDKCliError::PsbtExtractTxError(Box::new(value)) + } +} diff --git a/src/handlers.rs b/src/handlers.rs index 35c881a3..db15bd63 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -42,7 +42,7 @@ use std::collections::BTreeMap; #[cfg(any(feature = "electrum", feature = "esplora"))] use std::collections::HashSet; use std::convert::TryFrom; -#[cfg(feature = "repl")] +#[cfg(any(feature = "repl", feature = "electrum", feature = "esplora"))] use std::io::Write; use std::str::FromStr; @@ -580,7 +580,7 @@ pub(crate) async fn handle_online_wallet_subcommand( while let Some(info) = info_subscriber.recv().await { match info { Info::TxGossiped(wtxid) => { - tracing::info!("Succuessfully broadcast WTXID: {wtxid}"); + tracing::info!("Successfully broadcast WTXID: {wtxid}"); break; } Info::ConnectionsMet => { @@ -745,11 +745,11 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { let network = cli_opts.network; + let home_dir = prepare_home_dir(cli_opts.datadir)?; + let wallet_name = &wallet_opts.wallet; + let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?; #[cfg(feature = "sqlite")] let result = { - let home_dir = prepare_home_dir(cli_opts.datadir)?; - let wallet_name = &wallet_opts.wallet; - let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?; let mut persister = match &wallet_opts.database_type { #[cfg(feature = "sqlite")] DatabaseType::Sqlite => { @@ -762,7 +762,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { let mut wallet = new_persisted_wallet(network, &mut persister, &wallet_opts)?; let blockchain_client = - new_blockchain_client(&wallet_opts, &wallet, Some(database_path))?; + new_blockchain_client(&wallet_opts, &wallet, database_path)?; let result = handle_online_wallet_subcommand( &mut wallet, @@ -775,6 +775,9 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { }; #[cfg(not(any(feature = "sqlite")))] let result = { + let wallet = new_wallet(network, &wallet_opts)?; + let blockchain_client = + crate::utils::new_blockchain_client(&wallet_opts, &wallet, database_path)?; let mut wallet = new_wallet(network, &wallet_opts)?; handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand) .await? @@ -871,7 +874,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { &mut wallet, &wallet_opts, line, - Some(database_path.clone()), + database_path.clone(), ) .await; #[cfg(feature = "sqlite")] @@ -904,7 +907,7 @@ async fn respond( wallet: &mut Wallet, wallet_opts: &WalletOpts, line: &str, - _datadir: Option, + _datadir: std::path::PathBuf, ) -> Result { use clap::Parser; diff --git a/src/utils.rs b/src/utils.rs index 3e31710b..8302e227 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -12,7 +12,6 @@ use crate::error::BDKCliError as Error; use std::str::FromStr; -#[cfg(feature = "sqlite")] use std::path::{Path, PathBuf}; use crate::commands::WalletOpts; @@ -76,11 +75,11 @@ pub(crate) fn parse_address(address_str: &str) -> Result { Ok(unchecked_address.assume_checked()) } -#[cfg(feature = "sqlite")] /// Prepare bdk-cli home directory /// /// This function is called to check if [`crate::CliOpts`] datadir is set. /// If not the default home directory is created at `~/.bdk-bitcoin`. +#[allow(dead_code)] pub(crate) fn prepare_home_dir(home_path: Option) -> Result { let dir = home_path.unwrap_or_else(|| { let mut dir = PathBuf::new(); @@ -101,11 +100,11 @@ pub(crate) fn prepare_home_dir(home_path: Option) -> Result, home_path: &Path, -) -> Result { +) -> Result { let mut dir = home_path.to_owned(); if let Some(wallet_name) = wallet_name { dir.push(wallet_name); @@ -153,8 +152,8 @@ pub(crate) enum BlockchainClient { /// Create a new blockchain from the wallet configuration options. pub(crate) fn new_blockchain_client( wallet_opts: &WalletOpts, - wallet: &Wallet, - datadir: Option, + _wallet: &Wallet, + _datadir: PathBuf, ) -> Result { #[cfg(any(feature = "electrum", feature = "esplora", feature = "rpc"))] let url = wallet_opts.url.as_str(); @@ -200,14 +199,12 @@ pub(crate) fn new_blockchain_client( None => Sync, }; - let mut builder = NodeBuilder::new(wallet.network()); + let builder = NodeBuilder::new(_wallet.network()); - if let Some(datadir) = datadir { - builder = builder.data_dir(&datadir); - }; let client = builder .required_peers(wallet_opts.compactfilter_opts.conn_count) - .build_with_wallet(wallet, scan_type)?; + .data_dir(&_datadir) + .build_with_wallet(_wallet, scan_type)?; BlockchainClient::KyotoClient { client } }