diff --git a/src/commands.rs b/src/commands.rs index 67f84537..52790bce 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -239,7 +239,7 @@ pub struct ProxyOpts { #[derive(Debug, Args, Clone, PartialEq, Eq)] pub struct CompactFilterOpts { /// Sets the number of parallel node connections. - #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "4", value_parser = value_parser!(u8).range(1..=15))] + #[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))] pub conn_count: u8, /// Optionally skip initial `skip_blocks` blocks. diff --git a/src/error.rs b/src/error.rs index 27606d73..ad7d383c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use bdk_wallet::bitcoin::hex::HexToBytesError; +use bdk_wallet::bitcoin::psbt::ExtractTxError; use bdk_wallet::bitcoin::{base64, consensus}; use thiserror::Error; @@ -51,7 +52,7 @@ pub enum BDKCliError { ParseOutPointError(#[from] bdk_wallet::bitcoin::blockdata::transaction::ParseOutPointError), #[error("PsbtExtractTxError: {0}")] - PsbtExtractTxError(#[from] bdk_wallet::bitcoin::psbt::ExtractTxError), + PsbtExtractTxError(Box), #[error("PsbtError: {0}")] PsbtError(#[from] bdk_wallet::bitcoin::psbt::Error), @@ -90,3 +91,9 @@ pub enum BDKCliError { #[error("BDK-Kyoto error: {0}")] BuilderError(#[from] bdk_kyoto::builder::BuilderError), } + +impl From for BDKCliError { + fn from(value: ExtractTxError) -> Self { + BDKCliError::PsbtExtractTxError(Box::new(value)) + } +} diff --git a/src/handlers.rs b/src/handlers.rs index 1ef05268..db15bd63 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -42,13 +42,17 @@ use std::collections::BTreeMap; #[cfg(any(feature = "electrum", feature = "esplora"))] use std::collections::HashSet; use std::convert::TryFrom; -#[cfg(feature = "repl")] +#[cfg(any(feature = "repl", feature = "electrum", feature = "esplora"))] use std::io::Write; use std::str::FromStr; #[cfg(feature = "electrum")] use crate::utils::BlockchainClient::Electrum; +#[cfg(feature = "cbf")] +use bdk_kyoto::{Info, LightClient}; use bdk_wallet::bitcoin::base64::prelude::*; +#[cfg(feature = "cbf")] +use tokio::select; #[cfg(any( feature = "electrum", feature = "esplora", @@ -507,7 +511,6 @@ pub(crate) async fn handle_online_wallet_subcommand( (Some(_), Some(_)) => panic!("Both `psbt` and `tx` options not allowed"), (None, None) => panic!("Missing `psbt` and `tx` option"), }; - let txid = match client { #[cfg(feature = "electrum")] Electrum { @@ -531,8 +534,69 @@ pub(crate) async fn handle_online_wallet_subcommand( .map_err(|e| Error::Generic(e.to_string()))?, #[cfg(feature = "cbf")] - KyotoClient { client: _ } => { - unimplemented!() + KyotoClient { client } => { + let LightClient { + requester, + mut log_subscriber, + mut info_subscriber, + mut warning_subscriber, + update_subscriber: _, + node, + } = client; + + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber) + .map_err(|e| Error::Generic(format!("SetGlobalDefault error: {}", e)))?; + + tokio::task::spawn(async move { node.run().await }); + tokio::task::spawn(async move { + select! { + log = log_subscriber.recv() => { + if let Some(log) = log { + tracing::info!("{log}"); + } + }, + warn = warning_subscriber.recv() => { + if let Some(warn) = warn { + tracing::warn!("{warn}"); + } + } + } + }); + let txid = tx.compute_txid(); + tracing::info!("Waiting for connections to broadcast..."); + while let Some(info) = info_subscriber.recv().await { + match info { + Info::ConnectionsMet => { + requester + .broadcast_random(tx.clone()) + .map_err(|e| Error::Generic(format!("{}", e)))?; + break; + } + _ => tracing::info!("{info}"), + } + } + tokio::time::timeout(tokio::time::Duration::from_secs(15), async move { + while let Some(info) = info_subscriber.recv().await { + match info { + Info::TxGossiped(wtxid) => { + tracing::info!("Successfully broadcast WTXID: {wtxid}"); + break; + } + Info::ConnectionsMet => { + tracing::info!("Rebroadcasting to new connections"); + requester.broadcast_random(tx.clone()).unwrap(); + } + _ => tracing::info!("{info}"), + } + } + }) + .await + .map_err(|_| { + tracing::warn!("Broadcast was unsuccessful"); + Error::Generic("Transaction broadcast timed out after 15 seconds".into()) + })?; + txid } }; Ok(json!({ "txid": txid })) @@ -681,11 +745,11 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { let network = cli_opts.network; + let home_dir = prepare_home_dir(cli_opts.datadir)?; + let wallet_name = &wallet_opts.wallet; + let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?; #[cfg(feature = "sqlite")] let result = { - let home_dir = prepare_home_dir(cli_opts.datadir)?; - let wallet_name = &wallet_opts.wallet; - let database_path = prepare_wallet_db_dir(wallet_name, &home_dir)?; let mut persister = match &wallet_opts.database_type { #[cfg(feature = "sqlite")] DatabaseType::Sqlite => { @@ -698,7 +762,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { let mut wallet = new_persisted_wallet(network, &mut persister, &wallet_opts)?; let blockchain_client = - new_blockchain_client(&wallet_opts, &wallet, Some(database_path))?; + new_blockchain_client(&wallet_opts, &wallet, database_path)?; let result = handle_online_wallet_subcommand( &mut wallet, @@ -711,6 +775,9 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { }; #[cfg(not(any(feature = "sqlite")))] let result = { + let wallet = new_wallet(network, &wallet_opts)?; + let blockchain_client = + crate::utils::new_blockchain_client(&wallet_opts, &wallet, database_path)?; let mut wallet = new_wallet(network, &wallet_opts)?; handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand) .await? @@ -807,7 +874,7 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result { &mut wallet, &wallet_opts, line, - Some(database_path.clone()), + database_path.clone(), ) .await; #[cfg(feature = "sqlite")] @@ -840,7 +907,7 @@ async fn respond( wallet: &mut Wallet, wallet_opts: &WalletOpts, line: &str, - _datadir: Option, + _datadir: std::path::PathBuf, ) -> Result { use clap::Parser; @@ -857,7 +924,7 @@ async fn respond( subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand), } => { let blockchain = - new_blockchain_client(wallet_opts, &wallet, _datadir).map_err(|e| e.to_string())?; + new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?; let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand) .await .map_err(|e| e.to_string())?; diff --git a/src/utils.rs b/src/utils.rs index 3e31710b..8302e227 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -12,7 +12,6 @@ use crate::error::BDKCliError as Error; use std::str::FromStr; -#[cfg(feature = "sqlite")] use std::path::{Path, PathBuf}; use crate::commands::WalletOpts; @@ -76,11 +75,11 @@ pub(crate) fn parse_address(address_str: &str) -> Result { Ok(unchecked_address.assume_checked()) } -#[cfg(feature = "sqlite")] /// Prepare bdk-cli home directory /// /// This function is called to check if [`crate::CliOpts`] datadir is set. /// If not the default home directory is created at `~/.bdk-bitcoin`. +#[allow(dead_code)] pub(crate) fn prepare_home_dir(home_path: Option) -> Result { let dir = home_path.unwrap_or_else(|| { let mut dir = PathBuf::new(); @@ -101,11 +100,11 @@ pub(crate) fn prepare_home_dir(home_path: Option) -> Result, home_path: &Path, -) -> Result { +) -> Result { let mut dir = home_path.to_owned(); if let Some(wallet_name) = wallet_name { dir.push(wallet_name); @@ -153,8 +152,8 @@ pub(crate) enum BlockchainClient { /// Create a new blockchain from the wallet configuration options. pub(crate) fn new_blockchain_client( wallet_opts: &WalletOpts, - wallet: &Wallet, - datadir: Option, + _wallet: &Wallet, + _datadir: PathBuf, ) -> Result { #[cfg(any(feature = "electrum", feature = "esplora", feature = "rpc"))] let url = wallet_opts.url.as_str(); @@ -200,14 +199,12 @@ pub(crate) fn new_blockchain_client( None => Sync, }; - let mut builder = NodeBuilder::new(wallet.network()); + let builder = NodeBuilder::new(_wallet.network()); - if let Some(datadir) = datadir { - builder = builder.data_dir(&datadir); - }; let client = builder .required_peers(wallet_opts.compactfilter_opts.conn_count) - .build_with_wallet(wallet, scan_type)?; + .data_dir(&_datadir) + .build_with_wallet(_wallet, scan_type)?; BlockchainClient::KyotoClient { client } }