diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index 572d1f5230..9bb414f128 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -17,7 +17,7 @@ workspace = true [dependencies] bitcoin = { version = "0.32.0", default-features = false } -bitcoincore-rpc = { version = "0.19.0" } +bdk_bitcoind_client = { version = "0.1.0", default-features = false } bdk_core = { path = "../core", version = "0.6.1", default-features = false } [dev-dependencies] @@ -26,9 +26,12 @@ bdk_testenv = { path = "../testenv" } bdk_chain = { path = "../chain" } [features] -default = ["std"] +default = ["std", "bitcoind_28_0"] std = ["bitcoin/std", "bdk_core/std"] serde = ["bitcoin/serde", "bdk_core/serde"] +bitcoind_28_0 = ["bdk_bitcoind_client/28_0"] +bitcoind_29_0 = ["bdk_bitcoind_client/29_0"] +bitcoind_30_0 = ["bdk_bitcoind_client/30_0"] [[example]] name = "filter_iter" diff --git a/crates/bitcoind_rpc/examples/filter_iter.rs b/crates/bitcoind_rpc/examples/filter_iter.rs index e79bde6720..011d15da4e 100644 --- a/crates/bitcoind_rpc/examples/filter_iter.rs +++ b/crates/bitcoind_rpc/examples/filter_iter.rs @@ -44,8 +44,10 @@ fn main() -> anyhow::Result<()> { // Configure RPC client let url = std::env::var("RPC_URL").context("must set RPC_URL")?; let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?; - let rpc_client = - bitcoincore_rpc::Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?; + let rpc_client = bdk_bitcoind_client::Client::with_auth( + &url, + bdk_bitcoind_client::Auth::CookieFile(cookie.into()), + )?; // Initialize `FilterIter` let mut spks = vec![]; diff --git a/crates/bitcoind_rpc/src/bip158.rs b/crates/bitcoind_rpc/src/bip158.rs index 4caf59fcdc..2ac0728e01 100644 --- a/crates/bitcoind_rpc/src/bip158.rs +++ b/crates/bitcoind_rpc/src/bip158.rs @@ -6,18 +6,18 @@ //! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki //! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki +use bdk_bitcoind_client; +use bdk_bitcoind_client::corepc_types::model::GetBlockHeaderVerbose; use bdk_core::bitcoin; use bdk_core::CheckPoint; use bitcoin::BlockHash; use bitcoin::{bip158::BlockFilter, Block, ScriptBuf}; -use bitcoincore_rpc; -use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi}; /// Type that returns Bitcoin blocks by matching a list of script pubkeys (SPKs) against a /// [`bip158::BlockFilter`](bitcoin::bip158::BlockFilter). /// /// * `FilterIter` talks to bitcoind via JSON-RPC interface, which is handled by the -/// [`bitcoincore_rpc::Client`]. +/// [`bdk_bitcoind_client::Client`]. /// * Collect the script pubkeys (SPKs) you want to watch. These will usually correspond to wallet /// addresses that have been handed out for receiving payments. /// * Construct `FilterIter` with the RPC client, SPKs, and [`CheckPoint`]. The checkpoint tip @@ -31,19 +31,19 @@ use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi}; #[derive(Debug)] pub struct FilterIter<'a> { /// RPC client - client: &'a bitcoincore_rpc::Client, + client: &'a bdk_bitcoind_client::Client, /// SPK inventory spks: Vec, /// checkpoint cp: CheckPoint, /// Header info, contains the prev and next hashes for each header. - header: Option, + header: Option, } impl<'a> FilterIter<'a> { /// Construct [`FilterIter`] with checkpoint, RPC client and SPKs. pub fn new( - client: &'a bitcoincore_rpc::Client, + client: &'a bdk_bitcoind_client::Client, cp: CheckPoint, spks: impl IntoIterator, ) -> Self { @@ -58,10 +58,10 @@ impl<'a> FilterIter<'a> { /// Return the agreement header with the remote node. /// /// Error if no agreement header is found. - fn find_base(&self) -> Result { + fn find_base(&self) -> Result { for cp in self.cp.iter() { - match self.client.get_block_header_info(&cp.hash()) { - Err(e) if is_not_found(&e) => continue, + match self.client.get_block_header_verbose(&cp.hash()) { + Err(e) if e.is_not_found_error() => continue, Ok(header) if header.confirmations <= 0 => continue, Ok(header) => return Ok(header), Err(e) => return Err(Error::Rpc(e)), @@ -111,7 +111,7 @@ impl Iterator for FilterIter<'_> { None => return Ok(None), }; - let mut next_header = self.client.get_block_header_info(&next_hash)?; + let mut next_header = self.client.get_block_header_verbose(&next_hash)?; // In case of a reorg, rewind by fetching headers of previous hashes until we find // one with enough confirmations. @@ -119,12 +119,12 @@ impl Iterator for FilterIter<'_> { let prev_hash = next_header .previous_block_hash .ok_or(Error::ReorgDepthExceeded)?; - let prev_header = self.client.get_block_header_info(&prev_hash)?; + let prev_header = self.client.get_block_header_verbose(&prev_hash)?; next_header = prev_header; } next_hash = next_header.hash; - let next_height: u32 = next_header.height.try_into()?; + let next_height: u32 = next_header.height; cp = cp.insert(next_height, next_hash); @@ -153,7 +153,7 @@ impl Iterator for FilterIter<'_> { #[derive(Debug)] pub enum Error { /// RPC error - Rpc(bitcoincore_rpc::Error), + Rpc(bdk_bitcoind_client::Error), /// `bitcoin::bip158` error Bip158(bitcoin::bip158::Error), /// Max reorg depth exceeded. @@ -175,8 +175,8 @@ impl core::fmt::Display for Error { impl core::error::Error for Error {} -impl From for Error { - fn from(e: bitcoincore_rpc::Error) -> Self { +impl From for Error { + fn from(e: bdk_bitcoind_client::Error) -> Self { Self::Rpc(e) } } @@ -186,12 +186,3 @@ impl From for Error { Self::TryFromInt(e) } } - -/// Whether the RPC error is a "not found" error (code: `-5`). -fn is_not_found(e: &bitcoincore_rpc::Error) -> bool { - matches!( - e, - bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(e)) - if e.code == -5 - ) -} diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 35e6e24272..ff782b4ec3 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -2,7 +2,7 @@ //! use the wallet RPC API, so this crate can be used with wallet-disabled Bitcoin Core nodes. //! //! [`Emitter`] is the main structure which sources blockchain data from -//! [`bitcoincore_rpc::Client`]. +//! [`bdk_bitcoind_client::Client`]. //! //! To only get block updates (exclude mempool transactions), the caller can use //! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A @@ -15,34 +15,33 @@ extern crate alloc; use alloc::sync::Arc; +use bdk_bitcoind_client::{corepc_types, Client}; use bdk_core::collections::{HashMap, HashSet}; use bdk_core::{BlockId, CheckPoint}; use bitcoin::{Block, BlockHash, Transaction, Txid}; -use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi}; -use core::ops::Deref; pub mod bip158; -pub use bitcoincore_rpc; +pub use bdk_bitcoind_client; -/// The [`Emitter`] is used to emit data sourced from [`bitcoincore_rpc::Client`]. +/// The [`Emitter`] is used to emit data sourced from [`bdk_bitcoind_client::Client`]. /// /// Refer to [module-level documentation] for more. /// /// [module-level documentation]: crate -pub struct Emitter { - client: C, +pub struct Emitter<'a> { + client: &'a Client, start_height: u32, /// The checkpoint of the last-emitted block that is in the best chain. If it is later found /// that the block is no longer in the best chain, it will be popped off from here. last_cp: CheckPoint, - /// The block result returned from rpc of the last-emitted block. As this result contains the - /// next block's block hash (which we use to fetch the next block), we set this to `None` - /// whenever there are no more blocks, or the next block is no longer in the best chain. This - /// gives us an opportunity to re-fetch this result. - last_block: Option, + /// The block header result returned from rpc of the last-emitted block. As this result + /// contains the next block's block hash (which we use to fetch the next block), we set + /// this to `None` whenever there are no more blocks, or the next block is no longer in the + /// best chain. This gives us an opportunity to re-fetch this result. + last_block: Option, /// The last snapshot of mempool transactions. /// @@ -62,11 +61,7 @@ pub struct Emitter { /// to start empty (i.e. with no unconfirmed transactions). pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty> = core::iter::empty(); -impl Emitter -where - C: Deref, - C::Target: RpcApi, -{ +impl<'a> Emitter<'a> { /// Construct a new [`Emitter`]. /// /// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter @@ -79,7 +74,7 @@ where /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used. pub fn new( - client: C, + client: &'a Client, last_cp: CheckPoint, start_height: u32, expected_mempool_txs: impl IntoIterator>>, @@ -108,7 +103,7 @@ where /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always /// return an empty `evicted` set. #[cfg(feature = "std")] - pub fn mempool(&mut self) -> Result { + pub fn mempool(&mut self) -> Result { let sync_time = std::time::UNIX_EPOCH .elapsed() .expect("must get current time") @@ -121,8 +116,11 @@ where /// `sync_time` is in unix seconds. /// /// This is the no-std version of [`mempool`](Self::mempool). - pub fn mempool_at(&mut self, sync_time: u64) -> Result { - let client = &*self.client; + pub fn mempool_at( + &mut self, + sync_time: u64, + ) -> Result { + let client = &self.client; let mut rpc_tip_height; let mut rpc_tip_hash; @@ -145,10 +143,10 @@ where let mut mempool_event = MempoolEvent { update: rpc_mempool .into_iter() - .filter_map(|txid| -> Option> { + .filter_map(|txid| -> Option> { let tx = match self.mempool_snapshot.get(&txid) { Some(tx) => tx.clone(), - None => match client.get_raw_transaction(&txid, None) { + None => match client.get_raw_transaction(&txid) { Ok(tx) => { let tx = Arc::new(tx); self.mempool_snapshot.insert(txid, tx.clone()); @@ -164,8 +162,7 @@ where ..Default::default() }; - let at_tip = - rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash(); + let at_tip = rpc_tip_height == self.last_cp.height() && rpc_tip_hash == self.last_cp.hash(); if at_tip { // We only emit evicted transactions when we have already emitted the RPC tip. This is @@ -199,8 +196,8 @@ where } /// Emit the next block height and block (if any). - pub fn next_block(&mut self) -> Result>, bitcoincore_rpc::Error> { - if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? { + pub fn next_block(&mut self) -> Result>, bdk_bitcoind_client::Error> { + if let Some((checkpoint, block)) = poll(self)? { // Stop tracking unconfirmed transactions that have been confirmed in this block. for tx in &block.txdata { self.mempool_snapshot.remove(&tx.compute_txid()); @@ -265,39 +262,38 @@ impl BlockEvent { } enum PollResponse { - Block(bitcoincore_rpc_json::GetBlockResult), + Block(corepc_types::model::GetBlockHeaderVerbose), NoMoreBlocks, /// Fetched block is not in the best chain. BlockNotInBestChain, - AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint), + AgreementFound( + corepc_types::model::GetBlockHeaderVerbose, + CheckPoint, + ), /// Force the genesis checkpoint down the receiver's throat. AgreementPointNotFound(BlockHash), } -fn poll_once(emitter: &Emitter) -> Result -where - C: Deref, - C::Target: RpcApi, -{ - let client = &*emitter.client; +fn poll_once(emitter: &Emitter) -> Result { + let client = &emitter.client; if let Some(last_res) = &emitter.last_block { - let next_hash = if last_res.height + 1 < emitter.start_height as _ { + let next_hash = if last_res.height + 1 < emitter.start_height { // enforce start height - let next_hash = client.get_block_hash(emitter.start_height as _)?; + let next_hash = client.get_block_hash(emitter.start_height)?; // make sure last emission is still in best chain - if client.get_block_hash(last_res.height as _)? != last_res.hash { + if client.get_block_hash(last_res.height)? != last_res.hash { return Ok(PollResponse::BlockNotInBestChain); } next_hash } else { - match last_res.nextblockhash { + match last_res.next_block_hash { None => return Ok(PollResponse::NoMoreBlocks), Some(next_hash) => next_hash, } }; - let res = client.get_block_info(&next_hash)?; + let res = client.get_block_header_verbose(&next_hash)?; if res.confirmations < 0 { return Ok(PollResponse::BlockNotInBestChain); } @@ -306,7 +302,7 @@ where } for cp in emitter.last_cp.iter() { - let res = match client.get_block_info(&cp.hash()) { + let res = match client.get_block_header_verbose(&cp.hash()) { // block not in best chain Ok(res) if res.confirmations < 0 => continue, Ok(res) => res, @@ -328,21 +324,15 @@ where Ok(PollResponse::AgreementPointNotFound(genesis_hash)) } -fn poll( - emitter: &mut Emitter, - get_item: F, -) -> Result, V)>, bitcoincore_rpc::Error> -where - C: Deref, - C::Target: RpcApi, - F: Fn(&BlockHash, &C::Target) -> Result, -{ +fn poll( + emitter: &mut Emitter, +) -> Result, Block)>, bdk_bitcoind_client::Error> { loop { match poll_once(emitter)? { PollResponse::Block(res) => { - let height = res.height as u32; + let height = res.height; let hash = res.hash; - let item = get_item(&hash, &emitter.client)?; + let item = emitter.client.get_block(&hash)?; let new_cp = emitter .last_cp @@ -364,10 +354,8 @@ where PollResponse::AgreementFound(res, cp) => { // When a reorg happens, the agreement point drops below `last_cp`. We // override `start_height` so the emitter revisits the invalidated heights. - if (res.height as u32) < emitter.start_height - && (res.height as u32) < emitter.last_cp.height() - { - emitter.start_height = res.height as _; + if res.height < emitter.start_height && res.height < emitter.last_cp.height() { + emitter.start_height = res.height; } // get rid of evicted blocks emitter.last_cp = cp; @@ -383,26 +371,6 @@ where } } -/// Extends [`bitcoincore_rpc::Error`]. -pub trait BitcoindRpcErrorExt { - /// Returns whether the error is a "not found" error. - /// - /// This is useful since [`Emitter`] emits [`Result<_, bitcoincore_rpc::Error>`]s as - /// [`Iterator::Item`]. - fn is_not_found_error(&self) -> bool; -} - -impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { - fn is_not_found_error(&self) -> bool { - if let bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(rpc_err)) = self - { - rpc_err.code == -5 - } else { - false - } - } -} - #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod test { @@ -418,9 +386,9 @@ mod test { let (chain, _) = LocalChain::from_genesis(env.genesis_hash()?); let chain_tip = chain.tip(); - let rpc_client = bitcoincore_rpc::Client::new( + let rpc_client = bdk_bitcoind_client::Client::with_auth( &env.bitcoind.rpc_url(), - bitcoincore_rpc::Auth::CookieFile(env.bitcoind.params.cookie_file.clone()), + bdk_bitcoind_client::Auth::CookieFile(env.bitcoind.params.cookie_file.clone()), )?; let mut emitter = Emitter::new(&rpc_client, chain_tip.clone(), 1, NO_EXPECTED_MEMPOOL_TXS); diff --git a/crates/bitcoind_rpc/tests/common/mod.rs b/crates/bitcoind_rpc/tests/common/mod.rs index bbd914cca7..2669298b5c 100644 --- a/crates/bitcoind_rpc/tests/common/mod.rs +++ b/crates/bitcoind_rpc/tests/common/mod.rs @@ -1,20 +1,19 @@ use bdk_testenv::anyhow; use bdk_testenv::TestEnv; -/// This trait is used for testing. It allows creating a new [`bitcoincore_rpc::Client`] connected -/// to the instance of bitcoind running in the test environment. This way the `TestEnv` and the -/// `Emitter` aren't required to share the same client. In the future when we no longer depend on -/// `bitcoincore-rpc`, this can be updated to return the production client that is used by BDK. +/// This trait is used for testing. It allows creating a new [`bdk_bitcoind_client::Client`] +/// connected to the instance of bitcoind running in the test environment. This way the `TestEnv` +/// and the `Emitter` aren't required to share the same client. pub trait ClientExt { - /// Creates a new [`bitcoincore_rpc::Client`] connected to the current node instance. - fn get_rpc_client(&self) -> anyhow::Result; + /// Creates a new [`bdk_bitcoind_client::Client`] connected to the current node instance. + fn get_rpc_client(&self) -> anyhow::Result; } impl ClientExt for TestEnv { - fn get_rpc_client(&self) -> anyhow::Result { - Ok(bitcoincore_rpc::Client::new( + fn get_rpc_client(&self) -> anyhow::Result { + Ok(bdk_bitcoind_client::Client::with_auth( &self.bitcoind.rpc_url(), - bitcoincore_rpc::Auth::CookieFile(self.bitcoind.params.cookie_file.clone()), + bdk_bitcoind_client::Auth::CookieFile(self.bitcoind.params.cookie_file.clone()), )?) } } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index cbde85d6e7..a973cd148c 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, ops::Deref}; +use std::collections::BTreeSet; use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXS}; use bdk_chain::{ @@ -298,15 +298,11 @@ fn process_block( Ok(()) } -fn sync_from_emitter( +fn sync_from_emitter( recv_chain: &mut LocalChain, recv_graph: &mut IndexedTxGraph>, - emitter: &mut Emitter, -) -> anyhow::Result<()> -where - C: Deref, - C::Target: bitcoincore_rpc::RpcApi, -{ + emitter: &mut Emitter, +) -> anyhow::Result<()> { while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); process_block(recv_chain, recv_graph, emission.block, height)?; @@ -557,7 +553,6 @@ fn no_agreement_point() -> anyhow::Result<()> { /// 3. Insert the eviction into the graph and assert tx1 is no longer canonical. #[test] fn test_expect_tx_evicted() -> anyhow::Result<()> { - use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin; use bdk_chain::miniscript; use bdk_chain::spk_txout::SpkTxOutIndex; use bitcoin::constants::genesis_block; diff --git a/crates/bitcoind_rpc/tests/test_filter_iter.rs b/crates/bitcoind_rpc/tests/test_filter_iter.rs index e45e533110..be809a4626 100644 --- a/crates/bitcoind_rpc/tests/test_filter_iter.rs +++ b/crates/bitcoind_rpc/tests/test_filter_iter.rs @@ -2,7 +2,6 @@ use bdk_bitcoind_rpc::bip158::{Error, FilterIter}; use bdk_core::CheckPoint; use bdk_testenv::{anyhow, corepc_node, TestEnv}; use bitcoin::{Address, Amount, Network, ScriptBuf}; -use bitcoincore_rpc::RpcApi; use crate::common::ClientExt; @@ -21,9 +20,7 @@ fn testenv() -> anyhow::Result { #[test] fn filter_iter_matches_blocks() -> anyhow::Result<()> { let env = testenv()?; - let addr = ClientExt::get_rpc_client(&env)? - .get_new_address(None, None)? - .assume_checked(); + let addr = env.bitcoind.client.new_address()?; let _ = env.mine_blocks(100, Some(addr.clone()))?; assert_eq!(ClientExt::get_rpc_client(&env)?.get_block_count()?, 101); @@ -78,7 +75,7 @@ fn filter_iter_detects_reorgs() -> anyhow::Result<()> { let env = testenv()?; let rpc = ClientExt::get_rpc_client(&env)?; - while rpc.get_block_count()? < MINE_TO as u64 { + while rpc.get_block_count()? < MINE_TO { let _ = env.mine_blocks(1, None)?; } diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 0263c5b0bd..0ca516ab7f 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -8,10 +8,13 @@ use std::{ }; use bdk_bitcoind_rpc::{ - bitcoincore_rpc::{Auth, Client, RpcApi}, + bdk_bitcoind_client::{jsonrpc::serde_json::json, Auth, Client}, Emitter, }; -use bdk_chain::{bitcoin::Block, local_chain, CanonicalizationParams, Merge}; +use bdk_chain::{ + bitcoin::{consensus::encode::serialize_hex, Block, Txid}, + local_chain, CanonicalizationParams, Merge, +}; use example_cli::{ anyhow, clap::{self, Args, Subcommand}, @@ -59,7 +62,7 @@ struct RpcArgs { impl From for Auth { fn from(args: RpcArgs) -> Self { match (args.rpc_cookie, args.rpc_user, args.rpc_password) { - (None, None, None) => Self::None, + (None, None, None) => panic!("rpc auth: missing auth method"), (Some(path), _, _) => Self::CookieFile(path), (_, Some(user), Some(pass)) => Self::UserPass(user, pass), (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), @@ -70,16 +73,14 @@ impl From for Auth { impl RpcArgs { fn new_client(&self) -> anyhow::Result { - Ok(Client::new( - &self.url, - match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) { - (None, None, None) => Auth::None, - (Some(path), _, _) => Auth::CookieFile(path.clone()), - (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), - (_, Some(_), None) => panic!("rpc auth: missing rpc_pass"), - (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), - }, - )?) + let auth = match (&self.rpc_cookie, &self.rpc_user, &self.rpc_password) { + (None, None, None) => panic!("rpc auth: missing auth method"), + (Some(path), _, _) => Auth::CookieFile(path.clone()), + (_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()), + (_, None, Some(_)) => panic!("rpc auth: missing rpc_user"), + (_, Some(_), None) => panic!("rpc auth: missing rpc_pass "), + }; + Ok(Client::with_auth(&self.url, auth)?) } } @@ -97,7 +98,6 @@ enum RpcCommands { rpc_args: RpcArgs, }, } - fn main() -> anyhow::Result<()> { let start = Instant::now(); @@ -122,7 +122,8 @@ fn main() -> anyhow::Result<()> { network, |rpc_args, tx| { let client = rpc_args.new_client()?; - client.send_raw_transaction(tx)?; + let _txid: Txid = + client.call("sendrawtransaction", &[json!(serialize_hex(tx))])?; Ok(()) }, general_cmd, @@ -241,12 +242,12 @@ fn main() -> anyhow::Result<()> { } = rpc_args; let sigterm_flag = start_ctrlc_handler(); - let rpc_client = Arc::new(rpc_args.new_client()?); + let rpc_client = rpc_args.new_client()?; let mut emitter = { let chain = chain.lock().unwrap(); let graph = graph.lock().unwrap(); Emitter::new( - rpc_client.clone(), + &rpc_client, chain.tip(), fallback_height, graph @@ -266,120 +267,131 @@ fn main() -> anyhow::Result<()> { start.elapsed().as_secs_f32() ); let (tx, rx) = std::sync::mpsc::sync_channel::(CHANNEL_BOUND); - let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> { - let mut block_count = rpc_client.get_block_count()? as u32; - tx.send(Emission::Tip(block_count))?; - loop { - match emitter.next_block()? { - Some(block_emission) => { - let height = block_emission.block_height(); - if sigterm_flag.load(Ordering::Acquire) { - break; + let _ = std::thread::scope(|s| -> anyhow::Result<()> { + let emission_jh = s.spawn(|| -> anyhow::Result<()> { + let mut block_count = rpc_client.get_block_count()?; + tx.send(Emission::Tip(block_count))?; + + loop { + match emitter.next_block()? { + Some(block_emission) => { + let height = block_emission.block_height(); + if sigterm_flag.load(Ordering::Acquire) { + break; + } + if height > block_count { + block_count = rpc_client.get_block_count()?; + tx.send(Emission::Tip(block_count))?; + } + tx.send(Emission::Block(block_emission))?; } - if height > block_count { - block_count = rpc_client.get_block_count()? as u32; - tx.send(Emission::Tip(block_count))?; + None => { + if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { + break; + } + println!("preparing mempool emission..."); + let now = Instant::now(); + tx.send(Emission::Mempool(emitter.mempool()?))?; + println!( + "mempool emission prepared in {}s", + now.elapsed().as_secs() + ); + continue; } - tx.send(Emission::Block(block_emission))?; + }; + } + + println!("emitter thread shutting down..."); + Ok(()) + }); + + let mut tip_height = 0_u32; + let mut last_db_commit = Instant::now(); + let mut last_print = Option::::None; + let mut db_stage = ChangeSet::default(); + + for emission in rx { + let mut graph = graph.lock().unwrap(); + let mut chain = chain.lock().unwrap(); + + let (chain_changeset, graph_changeset) = match emission { + Emission::Block(block_emission) => { + let height = block_emission.block_height(); + let chain_changeset = + chain.apply_update(block_emission.checkpoint).expect( + "must always apply as we receive blocks in order from emitter", + ); + let graph_changeset = + graph.apply_block_relevant(&block_emission.block, height); + (chain_changeset, graph_changeset) } - None => { - if await_flag(&sigterm_flag, MEMPOOL_EMIT_DELAY) { - break; - } - println!("preparing mempool emission..."); - let now = Instant::now(); - tx.send(Emission::Mempool(emitter.mempool()?))?; - println!("mempool emission prepared in {}s", now.elapsed().as_secs()); + Emission::Mempool(mempool_txs) => { + let mut graph_changeset = + graph.batch_insert_relevant_unconfirmed(mempool_txs.update.clone()); + graph_changeset + .merge(graph.batch_insert_relevant_evicted_at(mempool_txs.evicted)); + (local_chain::ChangeSet::default(), graph_changeset) + } + Emission::Tip(h) => { + tip_height = h; continue; } }; - } - - println!("emitter thread shutting down..."); - Ok(()) - }); - - let mut tip_height = 0_u32; - let mut last_db_commit = Instant::now(); - let mut last_print = Option::::None; - let mut db_stage = ChangeSet::default(); - for emission in rx { - let mut graph = graph.lock().unwrap(); - let mut chain = chain.lock().unwrap(); - - let (chain_changeset, graph_changeset) = match emission { - Emission::Block(block_emission) => { - let height = block_emission.block_height(); - let chain_changeset = chain - .apply_update(block_emission.checkpoint) - .expect("must always apply as we receive blocks in order from emitter"); - let graph_changeset = - graph.apply_block_relevant(&block_emission.block, height); - (chain_changeset, graph_changeset) - } - Emission::Mempool(mempool_txs) => { - let mut graph_changeset = - graph.batch_insert_relevant_unconfirmed(mempool_txs.update.clone()); - graph_changeset - .merge(graph.batch_insert_relevant_evicted_at(mempool_txs.evicted)); - (local_chain::ChangeSet::default(), graph_changeset) - } - Emission::Tip(h) => { - tip_height = h; - continue; + db_stage.merge(ChangeSet { + local_chain: chain_changeset, + tx_graph: graph_changeset.tx_graph, + indexer: graph_changeset.indexer, + ..Default::default() + }); + + if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + let db = &mut *db.lock().unwrap(); + last_db_commit = Instant::now(); + if let Some(changeset) = db_stage.take() { + db.append(&changeset)?; + } + println!( + "[{:>10}s] committed to db (took {}s)", + start.elapsed().as_secs_f32(), + last_db_commit.elapsed().as_secs_f32() + ); } - }; - - db_stage.merge(ChangeSet { - local_chain: chain_changeset, - tx_graph: graph_changeset.tx_graph, - indexer: graph_changeset.indexer, - ..Default::default() - }); - if last_db_commit.elapsed() >= DB_COMMIT_DELAY { - let db = &mut *db.lock().unwrap(); - last_db_commit = Instant::now(); - if let Some(changeset) = db_stage.take() { - db.append(&changeset)?; + if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY { + last_print = Some(Instant::now()); + let synced_to = chain.tip(); + let balance = { + graph + .canonical_view( + &*chain, + synced_to.block_id(), + CanonicalizationParams::default(), + ) + .balance( + graph.index.outpoints().iter().cloned(), + |(k, _), _| k == &Keychain::Internal, + 1, + ) + }; + println!( + "[{:>10}s] synced to {} @ {} / {} | total: {}", + start.elapsed().as_secs_f32(), + synced_to.hash(), + synced_to.height(), + tip_height, + balance.total() + ); } - println!( - "[{:>10}s] committed to db (took {}s)", - start.elapsed().as_secs_f32(), - last_db_commit.elapsed().as_secs_f32() - ); } - if last_print.map_or(Duration::MAX, |i| i.elapsed()) >= STDOUT_PRINT_DELAY { - last_print = Some(Instant::now()); - let synced_to = chain.tip(); - let balance = { - graph - .canonical_view( - &*chain, - synced_to.block_id(), - CanonicalizationParams::default(), - ) - .balance( - graph.index.outpoints().iter().cloned(), - |(k, _), _| k == &Keychain::Internal, - 1, - ) - }; - println!( - "[{:>10}s] synced to {} @ {} / {} | total: {}", - start.elapsed().as_secs_f32(), - synced_to.hash(), - synced_to.height(), - tip_height, - balance.total() - ); - } - } - - emission_jh.join().expect("must join emitter thread")?; + emission_jh + .join() + .unwrap() + .expect("must join emitter thread"); + Ok(()) + }); } }