diff --git a/Cargo.lock b/Cargo.lock index 0b550060af..f64f0ded18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4911,6 +4911,7 @@ dependencies = [ "common", "consensus", "crypto", + "futures", "logging", "mempool", "mockall", @@ -9000,6 +9001,7 @@ dependencies = [ "test-utils", "thiserror 1.0.69", "tokio", + "tokio-stream", ] [[package]] diff --git a/mempool/src/rpc.rs b/mempool/src/rpc.rs index b6b043f45f..41adfb59c1 100644 --- a/mempool/src/rpc.rs +++ b/mempool/src/rpc.rs @@ -31,9 +31,9 @@ use rpc::RpcResult; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)] pub struct GetTxResponse { - id: Id, - status: TxStatus, - transaction: HexEncoded, + pub id: Id, + pub status: TxStatus, + pub transaction: HexEncoded, } #[rpc::describe] diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 7a4d409ebc..6cccb1cd43 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -154,6 +154,7 @@ class UnicodeOnWindowsError(ValueError): 'wallet_sweep_address.py', 'wallet_sweep_delegation.py', 'wallet_recover_accounts.py', + 'wallet_mempool_events.py', 'wallet_tokens.py', 'wallet_tokens_freeze.py', 'wallet_tokens_transfer_from_multisig_addr.py', diff --git a/test/functional/wallet_conflict.py b/test/functional/wallet_conflict.py index 097350b9cc..35db3b544a 100644 --- a/test/functional/wallet_conflict.py +++ b/test/functional/wallet_conflict.py @@ -155,7 +155,7 @@ async def async_test(self): transactions.remove(transfer_tx) freeze_tx = transactions[0] - assert_equal(1, len(await wallet.list_pending_transactions())) + assert_equal(2, len(await wallet.list_pending_transactions())) # try to send tokens again should fail as the tokens are already sent @@ -163,12 +163,15 @@ async def async_test(self): assert_in("Coin selection error: No available UTXOs", await wallet.send_tokens_to_address(token_id, address, tokens_to_mint)) # check that the mempool still has the transfer tx assert node.mempool_contains_tx(transfer_tx_id) - # abandon it from the wallet side so it is not rebroadcasted - assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id)) + assert_in("Cannot change a transaction's state from InMempool", await wallet.abandon_transaction(transfer_tx_id)) # create a block with the freeze token transaction self.generate_block([freeze_tx]) assert_in("Success", await wallet.sync()) + self.log.info(f"transfer_tx_id = {transfer_tx_id}") + + # abandon it from the wallet side so it is not rebroadcasted + assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id)) # after the token is frozen the transfer token tx should be evicted by the mempool as conflicting # wait until mempool evicts the conflicting tx diff --git a/test/functional/wallet_mempool_events.py b/test/functional/wallet_mempool_events.py new file mode 100644 index 0000000000..b13f3ffdfc --- /dev/null +++ b/test/functional/wallet_mempool_events.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 RBB S.r.l +# Copyright (c) 2017-2021 The Bitcoin Core developers +# opensource@mintlayer.org +# SPDX-License-Identifier: MIT +# Licensed under the MIT License; +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Wallet mempool events test + +Check that: +* We can create 2 wallets with same mnemonic, +* get an address +* send coins to the wallet's address +* sync the wallet with the node +* check balance in both wallets +* send coins from Acc 0 to Acc 1 without creating a block +* the second wallet should get the new Tx from mempool events +* second wallet can create a new unconfirmed Tx on top of the on in mempool +""" + +import asyncio + +from test_framework.mintlayer import (ATOMS_PER_COIN, block_input_data_obj, + make_tx, reward_input) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal, assert_in +from test_framework.wallet_cli_controller import (DEFAULT_ACCOUNT_INDEX, + WalletCliController) + + +class WalletMempoolEvents(BitcoinTestFramework): + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [ + [ + "--blockprod-min-peers-to-produce-blocks=0", + ] + ] + + def setup_network(self): + self.setup_nodes() + self.sync_all(self.nodes[0:1]) + + def generate_block(self, transactions=[]): + node = self.nodes[0] + + block_input_data = {"PoW": {"reward_destination": "AnyoneCanSpend"}} + block_input_data = block_input_data_obj.encode(block_input_data).to_hex()[2:] + + # create a new block, taking transactions from mempool + block = node.blockprod_generate_block( + block_input_data, transactions, [], "FillSpaceFromMempool" + ) + node.chainstate_submit_block(block) + block_id = node.chainstate_best_block_id() + + # Wait for mempool to sync + self.wait_until( + lambda: node.mempool_local_best_block_id() == block_id, timeout=5 + ) + + return block_id + + def run_test(self): + asyncio.run(self.async_test()) + + async def async_test(self): + node = self.nodes[0] + async with WalletCliController(node, self.config, self.log) as wallet, \ + WalletCliController(node, self.config, self.log) as wallet2: + # new wallet + await wallet.create_wallet() + # create wallet2 with the same mnemonic + mnemonic = await wallet.show_seed_phrase() + assert mnemonic is not None + assert_in("Wallet recovered successfully", await wallet2.recover_wallet(mnemonic)) + + # check it is on genesis + best_block_height = await wallet.get_best_block_height() + self.log.info(f"best block height = {best_block_height}") + assert_equal(best_block_height, "0") + best_block_height = await wallet2.get_best_block_height() + assert_equal(best_block_height, "0") + + # new address + pub_key_bytes = await wallet.new_public_key() + assert_equal(len(pub_key_bytes), 33) + + # Get chain tip + tip_id = node.chainstate_best_block_id() + + # Submit a valid transaction + token_fee = 1000 + coins_to_send = 1 + token_fee_output = { + "Transfer": [ + {"Coin": token_fee * ATOMS_PER_COIN}, + { + "PublicKey": { + "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} + } + }, + ], + } + tx_fee_output = { + "Transfer": [ + {"Coin": coins_to_send * ATOMS_PER_COIN}, + { + "PublicKey": { + "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} + } + }, + ], + } + encoded_tx, tx_id = make_tx( + [reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0 + ) + + self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}") + + assert_in("No transaction found", await wallet.get_transaction(tx_id)) + + node.mempool_submit_transaction(encoded_tx, {}) + assert node.mempool_contains_tx(tx_id) + + self.generate_block() + assert not node.mempool_contains_tx(tx_id) + + # sync the wallet + assert_in("Success", await wallet.sync()) + assert_in("Success", await wallet2.sync()) + + acc0_address = await wallet.new_address() + + # both wallets have the same balances after syncing the new block + assert_in( + f"Coins amount: {coins_to_send * 2 + token_fee}", + await wallet.get_balance(), + ) + assert_in( + f"Coins amount: {coins_to_send * 2 + token_fee}", + await wallet2.get_balance(), + ) + + # create new account and get an address + assert_in("Success", await wallet.create_new_account()) + assert_in("Success", await wallet2.create_new_account()) + assert_in("Success", await wallet.select_account(1)) + acc1_address = await wallet.new_address() + + # go back to Acc 0 and send 1 coin to Acc 1 + coins_to_send = 2 + assert_in("Success", await wallet.select_account(DEFAULT_ACCOUNT_INDEX)) + assert_in( + "The transaction was submitted successfully", + await wallet.send_to_address(acc1_address, coins_to_send), + ) + + # check mempool has 1 transaction now + transactions = node.mempool_transactions() + assert_equal(len(transactions), 1) + transfer_tx = transactions[0] + + # check wallet 1 has it as pending + pending_txs = await wallet.list_pending_transactions() + assert_equal(1, len(pending_txs)) + transfer_tx_id = pending_txs[0] + + # check wallet 2 also received it from mempool events + pending_txs = await wallet2.list_pending_transactions() + assert_equal(1, len(pending_txs)) + assert_equal(transfer_tx_id, pending_txs[0]) + + assert_in("Success", await wallet.select_account(1)) + # wallet 2 should automatically recover Acc 1 + assert_in("Success", await wallet2.select_account(1)) + + # check both balances have `coins_to_send` coins in-mempool state + assert_in( + f"Coins amount: {coins_to_send}", + await wallet.get_balance(utxo_states=['in-mempool']), + ) + assert_in( + f"Coins amount: {coins_to_send}", + await wallet2.get_balance(utxo_states=['in-mempool']), + ) + + # check wallet2 can send 1 coin back to Acc0 from the not yet confirmed tx in mempool + assert_in( + "The transaction was submitted successfully", + await wallet2.send_to_address(acc0_address, 1), + ) + + self.generate_block() + + assert_in("Success", await wallet.sync()) + assert_in("Success", await wallet2.sync()) + + +if __name__ == "__main__": + WalletMempoolEvents().main() diff --git a/test/functional/wallet_tokens_change_supply.py b/test/functional/wallet_tokens_change_supply.py index 9aa13e7139..33fd0459ae 100644 --- a/test/functional/wallet_tokens_change_supply.py +++ b/test/functional/wallet_tokens_change_supply.py @@ -195,10 +195,10 @@ async def async_test(self): if total_tokens_supply > 0: assert_in( f"{token_id} ({valid_ticker}), amount: {total_tokens_supply}", - await wallet.get_balance(utxo_states=['confirmed', 'inactive']) + await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool']) ) else: - assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive'])) + assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool'])) # lock token supply diff --git a/utils/networking/Cargo.toml b/utils/networking/Cargo.toml index 9420bb9897..3bae6c38dd 100644 --- a/utils/networking/Cargo.toml +++ b/utils/networking/Cargo.toml @@ -14,6 +14,7 @@ itertools.workspace = true serde_with.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true [dev-dependencies] serde_test.workspace = true diff --git a/utils/networking/src/broadcaster.rs b/utils/networking/src/broadcaster.rs index c3ab301294..1ce728b2af 100644 --- a/utils/networking/src/broadcaster.rs +++ b/utils/networking/src/broadcaster.rs @@ -16,6 +16,7 @@ //! Broadcaster is a reliable version of [tokio::sync::broadcast]. use tokio::sync::mpsc; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream}; /// A reliable version of [tokio::sync::broadcast], sender part. /// @@ -97,6 +98,10 @@ impl Receiver { pub fn blocking_recv(&mut self) -> Option { self.0.blocking_recv() } + + pub fn into_stream(self) -> impl Stream { + UnboundedReceiverStream::new(self.0) + } } #[cfg(test)] diff --git a/wallet/src/account/output_cache/mod.rs b/wallet/src/account/output_cache/mod.rs index da4328b680..15247acd1f 100644 --- a/wallet/src/account/output_cache/mod.rs +++ b/wallet/src/account/output_cache/mod.rs @@ -971,7 +971,23 @@ impl OutputCache { tx_id: OutPointSourceId, tx: WalletTx, ) -> WalletResult<()> { - let already_present = self.txs.get(&tx_id).is_some_and(|tx| match tx.state() { + let existing_tx = self.txs.get(&tx_id); + let existing_tx_already_confirmed_or_same = existing_tx.is_some_and(|existing_tx| { + matches!( + (existing_tx.state(), tx.state()), + (TxState::Confirmed(_, _, _), _) + | (TxState::Inactive(_), TxState::Inactive(_)) + | (TxState::Abandoned, TxState::Abandoned) + | (TxState::Conflicted(_), TxState::Conflicted(_)) + | (TxState::InMempool(_), TxState::InMempool(_)) + ) + }); + + if existing_tx_already_confirmed_or_same { + return Ok(()); + } + + let already_present = existing_tx.is_some_and(|tx| match tx.state() { TxState::Abandoned | TxState::Conflicted(_) => false, TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Inactive(_) => true, }); @@ -1270,19 +1286,19 @@ impl OutputCache { fn update_token_issuance_state( unconfirmed_descendants: &mut BTreeMap>, data: &mut TokenIssuanceData, - delegation_id: &TokenId, + token_id: &TokenId, token_nonce: AccountNonce, tx_id: &OutPointSourceId, ) -> Result<(), WalletError> { let next_nonce = data .last_nonce .map_or(Some(AccountNonce::new(0)), |nonce| nonce.increment()) - .ok_or(WalletError::TokenIssuanceNonceOverflow(*delegation_id))?; + .ok_or(WalletError::TokenIssuanceNonceOverflow(*token_id))?; ensure!( token_nonce == next_nonce, OutputCacheInconsistencyError::InconsistentTokenIssuanceDuplicateNonce( - *delegation_id, + *token_id, token_nonce ) ); @@ -1490,12 +1506,10 @@ impl OutputCache { .filter_map(|tx| match tx { WalletTx::Block(_) => None, WalletTx::Tx(tx) => match tx.state() { - TxState::Inactive(_) | TxState::Conflicted(_) => { + TxState::Inactive(_) | TxState::Conflicted(_) | TxState::InMempool(_) => { Some(tx.get_transaction_with_id()) } - TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => { - None - } + TxState::Confirmed(_, _, _) | TxState::Abandoned => None, }, }) .collect() @@ -1720,6 +1734,18 @@ impl OutputCache { chain_config: &ChainConfig, tx_id: Id, ) -> WalletResult, WalletTx)>> { + if let Some(tx) = self.txs.get(&tx_id.into()) { + let cannot_abandone = match tx.state() { + TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => true, + TxState::Inactive(_) | TxState::Conflicted(_) => false, + }; + if cannot_abandone { + return Err(WalletError::CannotChangeTransactionState( + tx.state(), + TxState::Abandoned, + )); + } + } let all_abandoned = self.remove_from_unconfirmed_descendants(tx_id); let mut txs_to_rollback = vec![]; diff --git a/wallet/src/wallet/mod.rs b/wallet/src/wallet/mod.rs index 6f45246eb5..0cb5d0ac04 100644 --- a/wallet/src/wallet/mod.rs +++ b/wallet/src/wallet/mod.rs @@ -2558,8 +2558,6 @@ where } /// Rescan mempool for unconfirmed transactions and UTXOs - /// TODO: Currently we don't sync with the mempool - #[cfg(test)] pub fn scan_mempool( &mut self, transactions: &[SignedTransaction], diff --git a/wallet/wallet-controller/src/lib.rs b/wallet/wallet-controller/src/lib.rs index 62f96fe543..3bc5c113d6 100644 --- a/wallet/wallet-controller/src/lib.rs +++ b/wallet/wallet-controller/src/lib.rs @@ -32,6 +32,7 @@ use blockprod::BlockProductionError; use chainstate::tx_verifier::{ self, error::ScriptError, input_check::signature_only_check::SignatureOnlyVerifiable, }; +use futures::StreamExt; use futures::{never::Never, stream::FuturesOrdered, TryStreamExt}; use helpers::{ fetch_input_infos, fetch_token_info, fetch_utxo, fetch_utxo_extra_info, into_balances, @@ -80,7 +81,9 @@ use consensus::{GenerateBlockInputData, PoSTimestampSearchInputData}; use crypto::{ephemeral_e2e::EndToEndPrivateKey, key::hdkd::u31::U31}; use logging::log; use mempool::tx_accumulator::PackingStrategy; -pub use node_comm::node_traits::{ConnectedPeer, NodeInterface, PeerId}; +pub use node_comm::node_traits::{ + ConnectedPeer, MempoolEvents, MempoolNotification, NodeInterface, PeerId, +}; pub use node_comm::{ handles_client::WalletHandlesClient, make_cold_wallet_rpc_client, make_rpc_client, rpc_client::NodeRpcClient, @@ -202,6 +205,8 @@ pub struct Controller { staking_started: BTreeSet, wallet_events: W, + + mempool_events: MempoolEvents, } impl std::fmt::Debug for Controller { @@ -227,12 +232,17 @@ where wallet: RuntimeWallet, wallet_events: W, ) -> Result> { + let mempool_events = rpc_client + .mempool_subscribe_to_events() + .await + .map_err(ControllerError::NodeCallError)?; let mut controller = Self { chain_config, rpc_client, wallet, staking_started: BTreeSet::new(), wallet_events, + mempool_events, }; log::info!("Syncing the wallet..."); @@ -241,19 +251,24 @@ where Ok(controller) } - pub fn new_unsynced( + pub async fn new_unsynced( chain_config: Arc, rpc_client: N, wallet: RuntimeWallet, wallet_events: W, - ) -> Self { - Self { + ) -> Result> { + let mempool_events = rpc_client + .mempool_subscribe_to_events() + .await + .map_err(ControllerError::NodeCallError)?; + Ok(Self { chain_config, rpc_client, wallet, staking_started: BTreeSet::new(), wallet_events, - } + mempool_events, + }) } pub fn create_wallet( @@ -1345,8 +1360,51 @@ where } } - tokio::time::sleep(NORMAL_DELAY).await; + let mut delay = Box::pin(tokio::time::sleep(NORMAL_DELAY)); + + loop { + tokio::select! { + _ = &mut delay => { + break; + } + + maybe_event = self.mempool_events.next() => { + let event = match maybe_event { + Some(e) => e, + None => { + log::error!("Mempool notifications channel closed."); + tokio::time::sleep(ERROR_DELAY).await; + + self.mempool_events = self.rpc_client + .mempool_subscribe_to_events() + .await + .map_err(ControllerError::NodeCallError)?; + break + } + }; + + match event { + MempoolNotification::NewTransaction { tx_id } => { + let transaction = self.rpc_client + .mempool_get_transaction(tx_id) + .await + .map_err(ControllerError::NodeCallError)?; + + if let Some(transaction) = transaction { + let txs = [transaction]; + self.wallet.add_mempool_transactions(&txs, &self.wallet_events)?; + } else { + log::warn!( + "Transaction ID {} from mempool notification but not found", + tx_id + ); + } + } + } + } + } + } self.rebroadcast_txs(&mut rebroadcast_txs_timer).await; } } diff --git a/wallet/wallet-controller/src/runtime_wallet.rs b/wallet/wallet-controller/src/runtime_wallet.rs index 271fda0c10..4a22f810cb 100644 --- a/wallet/wallet-controller/src/runtime_wallet.rs +++ b/wallet/wallet-controller/src/runtime_wallet.rs @@ -1568,6 +1568,18 @@ where } } + pub fn add_mempool_transactions( + &mut self, + txs: &[SignedTransaction], + wallet_events: &impl WalletEvents, + ) -> WalletResult<()> { + match self { + RuntimeWallet::Software(w) => w.scan_mempool(txs, wallet_events), + #[cfg(feature = "trezor")] + RuntimeWallet::Trezor(w) => w.scan_mempool(txs, wallet_events), + } + } + pub fn get_delegations( &self, account_index: U31, diff --git a/wallet/wallet-controller/src/sync/tests/mod.rs b/wallet/wallet-controller/src/sync/tests/mod.rs index 7a17df8e6b..8082f580c4 100644 --- a/wallet/wallet-controller/src/sync/tests/mod.rs +++ b/wallet/wallet-controller/src/sync/tests/mod.rs @@ -38,7 +38,7 @@ use logging::log; use mempool::{tx_accumulator::PackingStrategy, FeeRate}; use mempool_types::tx_options::TxOptionsOverrides; use node_comm::{ - node_traits::{ConnectedPeer, PeerId}, + node_traits::{ConnectedPeer, MempoolEvents, PeerId}, rpc_client::NodeRpcError, }; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress}; @@ -438,6 +438,17 @@ impl NodeInterface for MockNode { FeeRate::from_amount_per_kb(Amount::from_atoms(1)), )]) } + + async fn mempool_get_transaction( + &self, + _tx_id: Id, + ) -> Result, Self::Error> { + unreachable!() + } + + async fn mempool_subscribe_to_events(&self) -> Result { + unreachable!() + } } fn create_chain(node: &MockNode, rng: &mut (impl Rng + CryptoRng), parent: u64, count: usize) { diff --git a/wallet/wallet-controller/src/synced_controller.rs b/wallet/wallet-controller/src/synced_controller.rs index e37f597ec5..4ca31c5dca 100644 --- a/wallet/wallet-controller/src/synced_controller.rs +++ b/wallet/wallet-controller/src/synced_controller.rs @@ -619,7 +619,7 @@ where let selected_utxos = self.wallet.get_utxos( self.account_index, UtxoType::Transfer | UtxoType::LockThenTransfer | UtxoType::IssueNft, - UtxoState::Confirmed | UtxoState::Inactive, + UtxoState::Confirmed | UtxoState::Inactive | UtxoState::InMempool, WithLocked::Unlocked, )?; diff --git a/wallet/wallet-controller/src/tests/compose_transaction_tests.rs b/wallet/wallet-controller/src/tests/compose_transaction_tests.rs index 3dfe5fa7de..2a69a270c8 100644 --- a/wallet/wallet-controller/src/tests/compose_transaction_tests.rs +++ b/wallet/wallet-controller/src/tests/compose_transaction_tests.rs @@ -182,6 +182,10 @@ async fn general_test(#[case] seed: Seed, #[case] use_htlc_secret: bool) { .expect_chainstate_info() .returning(move || Ok(chain_info_to_return.clone())); + node_mock + .expect_mempool_subscribe_to_events() + .returning(|| Ok(Box::new(futures::stream::empty()))); + node_mock }; diff --git a/wallet/wallet-node-client/Cargo.toml b/wallet/wallet-node-client/Cargo.toml index 2bf1366b8f..2f6c4d33e7 100644 --- a/wallet/wallet-node-client/Cargo.toml +++ b/wallet/wallet-node-client/Cargo.toml @@ -26,6 +26,7 @@ wallet-types = { path = "../types" } anyhow.workspace = true async-trait.workspace = true base64.workspace = true +futures.workspace = true mockall.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/wallet/wallet-node-client/src/handles_client/mod.rs b/wallet/wallet-node-client/src/handles_client/mod.rs index 8ef524c473..e66b66f143 100644 --- a/wallet/wallet-node-client/src/handles_client/mod.rs +++ b/wallet/wallet-node-client/src/handles_client/mod.rs @@ -19,6 +19,8 @@ use std::{ time::Duration, }; +use futures::StreamExt; + use blockprod::{BlockProductionError, BlockProductionHandle, TimestampSearchData}; use chainstate::{BlockSource, ChainInfo, ChainstateError, ChainstateHandle}; use common::{ @@ -32,7 +34,8 @@ use common::{ use consensus::GenerateBlockInputData; use crypto::ephemeral_e2e::EndToEndPublicKey; use mempool::{ - tx_accumulator::PackingStrategy, tx_options::TxOptionsOverrides, FeeRate, MempoolHandle, + event::MempoolEvent, tx_accumulator::PackingStrategy, tx_options::TxOptionsOverrides, FeeRate, + MempoolHandle, }; use p2p::{ error::P2pError, @@ -44,7 +47,7 @@ use serialization::hex::HexError; use utils_networking::IpOrSocketAddress; use wallet_types::wallet_type::WalletControllerMode; -use crate::node_traits::NodeInterface; +use crate::node_traits::{MempoolEvents, MempoolNotification, NodeInterface}; #[derive(Clone)] pub struct WalletHandlesClient { @@ -452,4 +455,27 @@ impl NodeInterface for WalletHandlesClient { let res = self.mempool.call(move |this| this.get_fee_rate_points(NUM_POINTS)).await??; Ok(res) } + + async fn mempool_subscribe_to_events(&self) -> Result { + let res = self.mempool.call_mut(move |this| this.subscribe_to_rpc_events()).await?; + + let subscription = res.into_stream().filter_map(|event| { + futures::future::ready(match event { + MempoolEvent::NewTip { .. } => None, + + MempoolEvent::TransactionProcessed(tx) => tx + .was_accepted() + .then_some(MempoolNotification::NewTransaction { tx_id: *tx.tx_id() }), + }) + }); + Ok(Box::new(subscription)) + } + + async fn mempool_get_transaction( + &self, + tx_id: Id, + ) -> Result, Self::Error> { + let res = self.mempool.call(move |this| this.transaction(&tx_id)).await?; + Ok(res) + } } diff --git a/wallet/wallet-node-client/src/mock.rs b/wallet/wallet-node-client/src/mock.rs index c234cce180..6a448f4083 100644 --- a/wallet/wallet-node-client/src/mock.rs +++ b/wallet/wallet-node-client/src/mock.rs @@ -41,7 +41,7 @@ use p2p::{ use utils_networking::IpOrSocketAddress; use wallet_types::wallet_type::WalletControllerMode; -use crate::node_traits::{MockNodeInterface, NodeInterface}; +use crate::node_traits::{MempoolEvents, MockNodeInterface, NodeInterface}; /// `Controller` requires the provided `impl NodeInterface` to also implement `Clone`. /// There is no way to make `MockNodeInterface` itself clonable, since `mockall::automock` doesn't @@ -315,6 +315,17 @@ impl NodeInterface for ClonableMockNodeInterface { self.lock().await.mempool_get_fee_rate_points().await } + async fn mempool_get_transaction( + &self, + tx_id: Id, + ) -> Result, Self::Error> { + self.lock().await.mempool_get_transaction(tx_id).await + } + + async fn mempool_subscribe_to_events(&self) -> Result { + self.lock().await.mempool_subscribe_to_events().await + } + async fn get_utxo(&self, outpoint: UtxoOutPoint) -> Result, Self::Error> { self.lock().await.get_utxo(outpoint).await } diff --git a/wallet/wallet-node-client/src/node_traits.rs b/wallet/wallet-node-client/src/node_traits.rs index d6bceb4735..c36f077b06 100644 --- a/wallet/wallet-node-client/src/node_traits.rs +++ b/wallet/wallet-node-client/src/node_traits.rs @@ -19,6 +19,8 @@ use std::{ time::Duration, }; +use futures::stream::Stream; + use chainstate::ChainInfo; use common::{ chain::{ @@ -37,6 +39,8 @@ use wallet_types::wallet_type::WalletControllerMode; pub use p2p::{interface::types::ConnectedPeer, types::peer_id::PeerId}; +pub type MempoolEvents = Box + Sync + Send + Unpin>; + #[mockall::automock(type Error = anyhow::Error;)] #[async_trait::async_trait] pub trait NodeInterface { @@ -146,6 +150,15 @@ pub trait NodeInterface { async fn mempool_get_fee_rate(&self, in_top_x_mb: usize) -> Result; async fn mempool_get_fee_rate_points(&self) -> Result, Self::Error>; + async fn mempool_get_transaction( + &self, + tx_id: Id, + ) -> Result, Self::Error>; + async fn mempool_subscribe_to_events(&self) -> Result; async fn get_utxo(&self, outpoint: UtxoOutPoint) -> Result, Self::Error>; } + +pub enum MempoolNotification { + NewTransaction { tx_id: Id }, +} diff --git a/wallet/wallet-node-client/src/rpc_client/client_impl.rs b/wallet/wallet-node-client/src/rpc_client/client_impl.rs index d0e78120f7..4c6e74aad8 100644 --- a/wallet/wallet-node-client/src/rpc_client/client_impl.rs +++ b/wallet/wallet-node-client/src/rpc_client/client_impl.rs @@ -19,6 +19,8 @@ use std::{ time::Duration, }; +use futures::StreamExt; + use blockprod::{rpc::BlockProductionRpcClient, TimestampSearchData}; use chainstate::{rpc::ChainstateRpcClient, ChainInfo}; use common::{ @@ -33,7 +35,8 @@ use common::{ use consensus::GenerateBlockInputData; use crypto::ephemeral_e2e::EndToEndPublicKey; use mempool::{ - rpc::MempoolRpcClient, tx_accumulator::PackingStrategy, tx_options::TxOptionsOverrides, FeeRate, + rpc::MempoolRpcClient, rpc_event::RpcEvent, tx_accumulator::PackingStrategy, + tx_options::TxOptionsOverrides, FeeRate, }; use p2p::{ interface::types::ConnectedPeer, @@ -44,7 +47,7 @@ use serialization::hex_encoded::HexEncoded; use utils_networking::IpOrSocketAddress; use wallet_types::wallet_type::WalletControllerMode; -use crate::node_traits::NodeInterface; +use crate::node_traits::{MempoolEvents, MempoolNotification, NodeInterface}; use super::{NodeRpcClient, NodeRpcError}; @@ -57,13 +60,13 @@ impl NodeInterface for NodeRpcClient { } async fn chainstate_info(&self) -> Result { - ChainstateRpcClient::info(&self.http_client) + ChainstateRpcClient::info(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn get_block(&self, block_id: Id) -> Result, Self::Error> { - ChainstateRpcClient::get_block(&self.http_client, block_id) + ChainstateRpcClient::get_block(&*self.rpc_client.lock().await, block_id) .await .map_err(NodeRpcError::ResponseError) .map(|block_opt| block_opt.map(HexEncoded::take)) @@ -74,7 +77,7 @@ impl NodeInterface for NodeRpcClient { from: BlockHeight, max_count: usize, ) -> Result, Self::Error> { - ChainstateRpcClient::get_mainchain_blocks(&self.http_client, from, max_count) + ChainstateRpcClient::get_mainchain_blocks(&*self.rpc_client.lock().await, from, max_count) .await .map_err(NodeRpcError::ResponseError) .map(|blocks| blocks.into_iter().map(HexEncoded::take).collect()) @@ -87,7 +90,7 @@ impl NodeInterface for NodeRpcClient { step: NonZeroUsize, ) -> Result)>, Self::Error> { ChainstateRpcClient::get_block_ids_as_checkpoints( - &self.http_client, + &*self.rpc_client.lock().await, start_height, end_height, step, @@ -97,13 +100,13 @@ impl NodeInterface for NodeRpcClient { } async fn get_best_block_id(&self) -> Result, Self::Error> { - ChainstateRpcClient::best_block_id(&self.http_client) + ChainstateRpcClient::best_block_id(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn get_best_block_height(&self) -> Result { - ChainstateRpcClient::best_block_height(&self.http_client) + ChainstateRpcClient::best_block_height(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } @@ -112,7 +115,7 @@ impl NodeInterface for NodeRpcClient { &self, height: BlockHeight, ) -> Result>, Self::Error> { - ChainstateRpcClient::block_id_at_height(&self.http_client, height) + ChainstateRpcClient::block_id_at_height(&*self.rpc_client.lock().await, height) .await .map_err(NodeRpcError::ResponseError) } @@ -123,7 +126,7 @@ impl NodeInterface for NodeRpcClient { second_block: Id, ) -> Result, BlockHeight)>, Self::Error> { ChainstateRpcClient::last_common_ancestor_by_id( - &self.http_client, + &*self.rpc_client.lock().await, first_block, second_block, ) @@ -133,14 +136,14 @@ impl NodeInterface for NodeRpcClient { async fn get_stake_pool_balance(&self, pool_id: PoolId) -> Result, Self::Error> { let pool_address = Address::new(&self.chain_config, pool_id)?; - ChainstateRpcClient::stake_pool_balance(&self.http_client, pool_address.into()) + ChainstateRpcClient::stake_pool_balance(&*self.rpc_client.lock().await, pool_address.into()) .await .map_err(NodeRpcError::ResponseError) } async fn get_staker_balance(&self, pool_id: PoolId) -> Result, Self::Error> { let pool_address = Address::new(&self.chain_config, pool_id)?; - ChainstateRpcClient::staker_balance(&self.http_client, pool_address.into()) + ChainstateRpcClient::staker_balance(&*self.rpc_client.lock().await, pool_address.into()) .await .map_err(NodeRpcError::ResponseError) } @@ -151,7 +154,7 @@ impl NodeInterface for NodeRpcClient { ) -> Result, Self::Error> { let pool_address = Address::new(&self.chain_config, pool_id)?; let dest_as_address = ChainstateRpcClient::pool_decommission_destination( - &self.http_client, + &*self.rpc_client.lock().await, pool_address.into(), ) .await @@ -167,14 +170,18 @@ impl NodeInterface for NodeRpcClient { ) -> Result, Self::Error> { let pool_address = Address::new(&self.chain_config, pool_id)?.into(); let delegation_address = Address::new(&self.chain_config, delegation_id)?.into(); - ChainstateRpcClient::delegation_share(&self.http_client, pool_address, delegation_address) - .await - .map_err(NodeRpcError::ResponseError) + ChainstateRpcClient::delegation_share( + &*self.rpc_client.lock().await, + pool_address, + delegation_address, + ) + .await + .map_err(NodeRpcError::ResponseError) } async fn get_token_info(&self, token_id: TokenId) -> Result, Self::Error> { let token_id = Address::new(&self.chain_config, token_id)?.into(); - ChainstateRpcClient::token_info(&self.http_client, token_id) + ChainstateRpcClient::token_info(&*self.rpc_client.lock().await, token_id) .await .map_err(NodeRpcError::ResponseError) } @@ -189,14 +196,14 @@ impl NodeInterface for NodeRpcClient { Ok::<_, Self::Error>(Address::new(&self.chain_config, token_id)?.into()) }) .collect::>()?; - ChainstateRpcClient::tokens_info(&self.http_client, token_ids) + ChainstateRpcClient::tokens_info(&*self.rpc_client.lock().await, token_ids) .await .map_err(NodeRpcError::ResponseError) } async fn get_order_info(&self, order_id: OrderId) -> Result, Self::Error> { let order_id = Address::new(&self.chain_config, order_id)?.into(); - ChainstateRpcClient::order_info(&self.http_client, order_id) + ChainstateRpcClient::order_info(&*self.rpc_client.lock().await, order_id) .await .map_err(NodeRpcError::ResponseError) } @@ -207,7 +214,7 @@ impl NodeInterface for NodeRpcClient { give_currency: Option, ) -> Result, Self::Error> { ChainstateRpcClient::orders_info_by_currencies( - &self.http_client, + &*self.rpc_client.lock().await, ask_currency .map(|currency| currency.to_rpc_currency(&self.chain_config)) .transpose()?, @@ -220,7 +227,7 @@ impl NodeInterface for NodeRpcClient { } async fn blockprod_e2e_public_key(&self) -> Result { - BlockProductionRpcClient::e2e_public_key(&self.http_client) + BlockProductionRpcClient::e2e_public_key(&*self.rpc_client.lock().await) .await .map(HexEncoded::take) .map_err(NodeRpcError::ResponseError) @@ -236,7 +243,7 @@ impl NodeInterface for NodeRpcClient { ) -> Result { let transactions = transactions.into_iter().map(HexEncoded::new).collect::>(); BlockProductionRpcClient::generate_block_e2e( - &self.http_client, + &*self.rpc_client.lock().await, encrypted_input_data, public_key.into(), transactions, @@ -257,7 +264,7 @@ impl NodeInterface for NodeRpcClient { all_timestamps_between_blocks: bool, ) -> Result { BlockProductionRpcClient::collect_timestamp_search_data( - &self.http_client, + &*self.rpc_client.lock().await, pool_id, min_height, max_height, @@ -278,7 +285,7 @@ impl NodeInterface for NodeRpcClient { ) -> Result { let transactions = transactions.into_iter().map(HexEncoded::new).collect::>(); BlockProductionRpcClient::generate_block( - &self.http_client, + &*self.rpc_client.lock().await, input_data.into(), transactions, transaction_ids, @@ -290,7 +297,7 @@ impl NodeInterface for NodeRpcClient { } async fn submit_block(&self, block: Block) -> Result<(), Self::Error> { - ChainstateRpcClient::submit_block(&self.http_client, block.into()) + ChainstateRpcClient::submit_block(&*self.rpc_client.lock().await, block.into()) .await .map_err(NodeRpcError::ResponseError) } @@ -300,41 +307,42 @@ impl NodeInterface for NodeRpcClient { tx: SignedTransaction, options: TxOptionsOverrides, ) -> Result<(), Self::Error> { - let status = P2pRpcClient::submit_transaction(&self.http_client, tx.into(), options) - .await - .map_err(NodeRpcError::ResponseError)?; + let status = + P2pRpcClient::submit_transaction(&*self.rpc_client.lock().await, tx.into(), options) + .await + .map_err(NodeRpcError::ResponseError)?; Ok(status) } async fn node_shutdown(&self) -> Result<(), Self::Error> { - node_lib::rpc::NodeRpcClient::shutdown(&self.http_client) + node_lib::rpc::NodeRpcClient::shutdown(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn node_enable_networking(&self, enable: bool) -> Result<(), Self::Error> { - P2pRpcClient::enable_networking(&self.http_client, enable) + P2pRpcClient::enable_networking(&*self.rpc_client.lock().await, enable) .await .map_err(NodeRpcError::ResponseError) } async fn node_version(&self) -> Result { - node_lib::rpc::NodeRpcClient::version(&self.http_client) + node_lib::rpc::NodeRpcClient::version(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_connect(&self, address: IpOrSocketAddress) -> Result<(), Self::Error> { - P2pRpcClient::connect(&self.http_client, address) + P2pRpcClient::connect(&*self.rpc_client.lock().await, address) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_disconnect(&self, peer_id: PeerId) -> Result<(), Self::Error> { - P2pRpcClient::disconnect(&self.http_client, peer_id) + P2pRpcClient::disconnect(&*self.rpc_client.lock().await, peer_id) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_list_banned(&self) -> Result, Self::Error> { - P2pRpcClient::list_banned(&self.http_client) + P2pRpcClient::list_banned(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } @@ -343,45 +351,45 @@ impl NodeInterface for NodeRpcClient { address: BannableAddress, duration: Duration, ) -> Result<(), Self::Error> { - P2pRpcClient::ban(&self.http_client, address, duration) + P2pRpcClient::ban(&*self.rpc_client.lock().await, address, duration) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_unban(&self, address: BannableAddress) -> Result<(), Self::Error> { - P2pRpcClient::unban(&self.http_client, address) + P2pRpcClient::unban(&*self.rpc_client.lock().await, address) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_list_discouraged(&self) -> Result, Self::Error> { - P2pRpcClient::list_discouraged(&self.http_client) + P2pRpcClient::list_discouraged(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_undiscourage(&self, address: BannableAddress) -> Result<(), Self::Error> { - P2pRpcClient::undiscourage(&self.http_client, address) + P2pRpcClient::undiscourage(&*self.rpc_client.lock().await, address) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_get_peer_count(&self) -> Result { - P2pRpcClient::get_peer_count(&self.http_client) + P2pRpcClient::get_peer_count(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_get_connected_peers(&self) -> Result, Self::Error> { - P2pRpcClient::get_connected_peers(&self.http_client) + P2pRpcClient::get_connected_peers(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_get_reserved_nodes(&self) -> Result, Self::Error> { - P2pRpcClient::get_reserved_nodes(&self.http_client) + P2pRpcClient::get_reserved_nodes(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } async fn p2p_add_reserved_node(&self, address: IpOrSocketAddress) -> Result<(), Self::Error> { - P2pRpcClient::add_reserved_node(&self.http_client, address) + P2pRpcClient::add_reserved_node(&*self.rpc_client.lock().await, address) .await .map_err(NodeRpcError::ResponseError) } @@ -389,25 +397,52 @@ impl NodeInterface for NodeRpcClient { &self, address: IpOrSocketAddress, ) -> Result<(), Self::Error> { - P2pRpcClient::remove_reserved_node(&self.http_client, address) + P2pRpcClient::remove_reserved_node(&*self.rpc_client.lock().await, address) .await .map_err(NodeRpcError::ResponseError) } async fn mempool_get_fee_rate(&self, in_top_x_mb: usize) -> Result { - MempoolRpcClient::get_fee_rate(&self.http_client, in_top_x_mb) + MempoolRpcClient::get_fee_rate(&*self.rpc_client.lock().await, in_top_x_mb) .await .map_err(NodeRpcError::ResponseError) } async fn mempool_get_fee_rate_points(&self) -> Result, Self::Error> { - MempoolRpcClient::get_fee_rate_points(&self.http_client) + MempoolRpcClient::get_fee_rate_points(&*self.rpc_client.lock().await) .await .map_err(NodeRpcError::ResponseError) } + async fn mempool_get_transaction( + &self, + tx_id: Id, + ) -> Result, Self::Error> { + MempoolRpcClient::get_transaction(&*self.rpc_client.lock().await, tx_id) + .await + .map_err(NodeRpcError::ResponseError) + .map(|opt| opt.map(|resp| resp.transaction.take())) + } + + async fn mempool_subscribe_to_events(&self) -> Result { + let subscription = MempoolRpcClient::subscribe_to_events(&*self.rpc_client.lock().await) + .await + .map_err(NodeRpcError::ResponseError)?; + + let subscription = subscription.filter_map(|item| { + futures::future::ready(item.ok().and_then(|event| match event { + RpcEvent::NewTip { .. } => None, + + RpcEvent::TransactionProcessed { + tx_id, successful, .. + } => successful.then_some(MempoolNotification::NewTransaction { tx_id }), + })) + }); + Ok(Box::new(subscription)) + } + async fn get_utxo(&self, outpoint: UtxoOutPoint) -> Result, Self::Error> { - ChainstateRpcClient::get_utxo(&self.http_client, outpoint.into()) + ChainstateRpcClient::get_utxo(&*self.rpc_client.lock().await, outpoint.into()) .await .map_err(NodeRpcError::ResponseError) } diff --git a/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs b/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs index d7dacaf3db..4c9eb1825e 100644 --- a/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs +++ b/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs @@ -39,7 +39,7 @@ use p2p::{ use utils_networking::IpOrSocketAddress; use wallet_types::wallet_type::WalletControllerMode; -use crate::node_traits::NodeInterface; +use crate::node_traits::{MempoolEvents, NodeInterface}; use super::ColdWalletClient; @@ -290,6 +290,17 @@ impl NodeInterface for ColdWalletClient { Err(ColdWalletRpcError::NotAvailable) } + async fn mempool_subscribe_to_events(&self) -> Result { + Ok(Box::new(futures::stream::empty())) + } + + async fn mempool_get_transaction( + &self, + _tx_id: Id, + ) -> Result, Self::Error> { + Err(ColdWalletRpcError::NotAvailable) + } + async fn get_utxo( &self, _outpoint: common::chain::UtxoOutPoint, diff --git a/wallet/wallet-node-client/src/rpc_client/mod.rs b/wallet/wallet-node-client/src/rpc_client/mod.rs index c24021e9b5..a52c3f5eb8 100644 --- a/wallet/wallet-node-client/src/rpc_client/mod.rs +++ b/wallet/wallet-node-client/src/rpc_client/mod.rs @@ -18,13 +18,15 @@ pub mod cold_wallet_client; use std::sync::Arc; +use tokio::sync::Mutex; + use common::address::AddressError; use common::chain::ChainConfig; use common::primitives::per_thousand::PerThousandParseError; -use rpc::new_http_client; +use rpc::new_ws_client; use rpc::ClientError; use rpc::RpcAuthData; -use rpc::RpcHttpClient; +use rpc::RpcWsClient; use crate::node_traits::NodeInterface; @@ -57,7 +59,7 @@ impl ColdWalletClient { #[derive(Clone, Debug)] pub struct NodeRpcClient { - http_client: RpcHttpClient, + rpc_client: Arc>, chain_config: Arc, } @@ -67,13 +69,13 @@ impl NodeRpcClient { remote_socket_address: String, rpc_auth: RpcAuthData, ) -> Result { - let host = format!("http://{remote_socket_address}"); + let host = format!("ws://{remote_socket_address}"); - let http_client = - new_http_client(host, rpc_auth).map_err(NodeRpcError::ClientCreationError)?; + let rpc_client = + new_ws_client(host, rpc_auth).await.map_err(NodeRpcError::ClientCreationError)?; let client = Self { - http_client, + rpc_client: Arc::new(Mutex::new(rpc_client)), chain_config, }; diff --git a/wallet/wallet-rpc-lib/src/service/worker.rs b/wallet/wallet-rpc-lib/src/service/worker.rs index ba7f3c09cf..5183dc30ba 100644 --- a/wallet/wallet-rpc-lib/src/service/worker.rs +++ b/wallet/wallet-rpc-lib/src/service/worker.rs @@ -197,6 +197,7 @@ where wallet, self.wallet_events.clone(), ) + .await? }; self.controller.replace(controller); @@ -261,6 +262,8 @@ where wallet, self.wallet_events.clone(), ) + .await + .map_err(RpcError::Controller)? }; self.controller.replace(controller);