Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions mempool/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use rpc::RpcResult;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, rpc::description::HasValueHint)]
pub struct GetTxResponse {
id: Id<Transaction>,
status: TxStatus,
transaction: HexEncoded<SignedTransaction>,
pub id: Id<Transaction>,
pub status: TxStatus,
pub transaction: HexEncoded<SignedTransaction>,
}

#[rpc::describe]
Expand Down
1 change: 1 addition & 0 deletions test/functional/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class UnicodeOnWindowsError(ValueError):
'wallet_sweep_address.py',
'wallet_sweep_delegation.py',
'wallet_recover_accounts.py',
'wallet_mempool_events.py',
'wallet_tokens.py',
'wallet_tokens_freeze.py',
'wallet_tokens_transfer_from_multisig_addr.py',
Expand Down
9 changes: 6 additions & 3 deletions test/functional/wallet_conflict.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,23 @@ async def async_test(self):
transactions.remove(transfer_tx)
freeze_tx = transactions[0]

assert_equal(1, len(await wallet.list_pending_transactions()))
assert_equal(2, len(await wallet.list_pending_transactions()))


# try to send tokens again should fail as the tokens are already sent
assert_in("Success", await wallet.select_account(1))
assert_in("Coin selection error: No available UTXOs", await wallet.send_tokens_to_address(token_id, address, tokens_to_mint))
# check that the mempool still has the transfer tx
assert node.mempool_contains_tx(transfer_tx_id)
# abandon it from the wallet side so it is not rebroadcasted
assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id))
assert_in("Cannot change a transaction's state from InMempool", await wallet.abandon_transaction(transfer_tx_id))

# create a block with the freeze token transaction
self.generate_block([freeze_tx])
assert_in("Success", await wallet.sync())
self.log.info(f"transfer_tx_id = {transfer_tx_id}")

# abandon it from the wallet side so it is not rebroadcasted
assert_in("The transaction was marked as abandoned successfully", await wallet.abandon_transaction(transfer_tx_id))

# after the token is frozen the transfer token tx should be evicted by the mempool as conflicting
# wait until mempool evicts the conflicting tx
Expand Down
212 changes: 212 additions & 0 deletions test/functional/wallet_mempool_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#!/usr/bin/env python3
# Copyright (c) 2026 RBB S.r.l
# Copyright (c) 2017-2021 The Bitcoin Core developers
# opensource@mintlayer.org
# SPDX-License-Identifier: MIT
# Licensed under the MIT License;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wallet mempool events test

Check that:
* We can create 2 wallets with same mnemonic,
* get an address
* send coins to the wallet's address
* sync the wallet with the node
* check balance in both wallets
* send coins from Acc 0 to Acc 1 without creating a block
Comment on lines +22 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz put more effort into writing comments. You have 2 wallets here, it's not clear which one of them you're talking about.

* the second wallet should get the new Tx from mempool events
* second wallet can create a new unconfirmed Tx on top of the on in mempool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"on top of the on in mempool" - broken phrase

"""

import asyncio

from test_framework.mintlayer import (ATOMS_PER_COIN, block_input_data_obj,
make_tx, reward_input)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_in
from test_framework.wallet_cli_controller import (DEFAULT_ACCOUNT_INDEX,
WalletCliController)


class WalletMempoolEvents(BitcoinTestFramework):

def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
self.extra_args = [
[
"--blockprod-min-peers-to-produce-blocks=0",
]
]

def setup_network(self):
self.setup_nodes()
self.sync_all(self.nodes[0:1])

def generate_block(self, transactions=[]):
node = self.nodes[0]

block_input_data = {"PoW": {"reward_destination": "AnyoneCanSpend"}}
block_input_data = block_input_data_obj.encode(block_input_data).to_hex()[2:]

# create a new block, taking transactions from mempool
block = node.blockprod_generate_block(
block_input_data, transactions, [], "FillSpaceFromMempool"
)
node.chainstate_submit_block(block)
block_id = node.chainstate_best_block_id()

# Wait for mempool to sync
self.wait_until(
lambda: node.mempool_local_best_block_id() == block_id, timeout=5
)

return block_id

def run_test(self):
asyncio.run(self.async_test())

async def async_test(self):
node = self.nodes[0]
async with WalletCliController(node, self.config, self.log) as wallet, \
WalletCliController(node, self.config, self.log) as wallet2:
# new wallet
await wallet.create_wallet()
# create wallet2 with the same mnemonic
mnemonic = await wallet.show_seed_phrase()
assert mnemonic is not None
assert_in("Wallet recovered successfully", await wallet2.recover_wallet(mnemonic))

# check it is on genesis
best_block_height = await wallet.get_best_block_height()
self.log.info(f"best block height = {best_block_height}")
assert_equal(best_block_height, "0")
best_block_height = await wallet2.get_best_block_height()
assert_equal(best_block_height, "0")

# new address
pub_key_bytes = await wallet.new_public_key()
assert_equal(len(pub_key_bytes), 33)

# Get chain tip
tip_id = node.chainstate_best_block_id()

# Submit a valid transaction
token_fee = 1000
coins_to_send = 1
token_fee_output = {
"Transfer": [
{"Coin": token_fee * ATOMS_PER_COIN},
{
"PublicKey": {
"key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}}
}
},
],
}
tx_fee_output = {
"Transfer": [
{"Coin": coins_to_send * ATOMS_PER_COIN},
{
"PublicKey": {
"key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}}
}
},
],
}
encoded_tx, tx_id = make_tx(
[reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0
)

self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}")

assert_in("No transaction found", await wallet.get_transaction(tx_id))

node.mempool_submit_transaction(encoded_tx, {})
assert node.mempool_contains_tx(tx_id)

self.generate_block()
assert not node.mempool_contains_tx(tx_id)

# sync the wallet
assert_in("Success", await wallet.sync())
assert_in("Success", await wallet2.sync())

acc0_address = await wallet.new_address()

# both wallets have the same balances after syncing the new block
assert_in(
f"Coins amount: {coins_to_send * 2 + token_fee}",
await wallet.get_balance(),
)
assert_in(
f"Coins amount: {coins_to_send * 2 + token_fee}",
await wallet2.get_balance(),
)

# create new account and get an address
assert_in("Success", await wallet.create_new_account())
assert_in("Success", await wallet2.create_new_account())
assert_in("Success", await wallet.select_account(1))
acc1_address = await wallet.new_address()

# go back to Acc 0 and send 1 coin to Acc 1
coins_to_send = 2
assert_in("Success", await wallet.select_account(DEFAULT_ACCOUNT_INDEX))
assert_in(
"The transaction was submitted successfully",
await wallet.send_to_address(acc1_address, coins_to_send),
)

# check mempool has 1 transaction now
transactions = node.mempool_transactions()
assert_equal(len(transactions), 1)
transfer_tx = transactions[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused


# check wallet 1 has it as pending
pending_txs = await wallet.list_pending_transactions()
assert_equal(1, len(pending_txs))
transfer_tx_id = pending_txs[0]

# check wallet 2 also received it from mempool events
pending_txs = await wallet2.list_pending_transactions()
assert_equal(1, len(pending_txs))
assert_equal(transfer_tx_id, pending_txs[0])

assert_in("Success", await wallet.select_account(1))
# wallet 2 should automatically recover Acc 1
assert_in("Success", await wallet2.select_account(1))

# check both balances have `coins_to_send` coins in-mempool state
assert_in(
f"Coins amount: {coins_to_send}",
await wallet.get_balance(utxo_states=['in-mempool']),
)
assert_in(
f"Coins amount: {coins_to_send}",
await wallet2.get_balance(utxo_states=['in-mempool']),
)

# check wallet2 can send 1 coin back to Acc0 from the not yet confirmed tx in mempool
assert_in(
"The transaction was submitted successfully",
await wallet2.send_to_address(acc0_address, 1),
)

self.generate_block()

assert_in("Success", await wallet.sync())
assert_in("Success", await wallet2.sync())


if __name__ == "__main__":
WalletMempoolEvents().main()
4 changes: 2 additions & 2 deletions test/functional/wallet_tokens_change_supply.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ async def async_test(self):
if total_tokens_supply > 0:
assert_in(
f"{token_id} ({valid_ticker}), amount: {total_tokens_supply}",
await wallet.get_balance(utxo_states=['confirmed', 'inactive'])
await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool'])
)
else:
assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive']))
assert_not_in(f"{token_id}", await wallet.get_balance(utxo_states=['confirmed', 'inactive', 'in-mempool']))


# lock token supply
Expand Down
1 change: 1 addition & 0 deletions utils/networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ itertools.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

[dev-dependencies]
serde_test.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions utils/networking/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! Broadcaster is a reliable version of [tokio::sync::broadcast].

use tokio::sync::mpsc;
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream};

/// A reliable version of [tokio::sync::broadcast], sender part.
///
Expand Down Expand Up @@ -97,6 +98,10 @@ impl<T> Receiver<T> {
pub fn blocking_recv(&mut self) -> Option<T> {
self.0.blocking_recv()
}

pub fn into_stream(self) -> impl Stream<Item = T> {
UnboundedReceiverStream::new(self.0)
}
}

#[cfg(test)]
Expand Down
42 changes: 34 additions & 8 deletions wallet/src/account/output_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,23 @@ impl OutputCache {
tx_id: OutPointSourceId,
tx: WalletTx,
) -> WalletResult<()> {
let already_present = self.txs.get(&tx_id).is_some_and(|tx| match tx.state() {
let existing_tx = self.txs.get(&tx_id);
let existing_tx_already_confirmed_or_same = existing_tx.is_some_and(|existing_tx| {
matches!(
(existing_tx.state(), tx.state()),
(TxState::Confirmed(_, _, _), _)
| (TxState::Inactive(_), TxState::Inactive(_))
| (TxState::Abandoned, TxState::Abandoned)
| (TxState::Conflicted(_), TxState::Conflicted(_))
| (TxState::InMempool(_), TxState::InMempool(_))
)
});
Comment on lines +974 to +984
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for this?


if existing_tx_already_confirmed_or_same {
return Ok(());
}

let already_present = existing_tx.is_some_and(|tx| match tx.state() {
TxState::Abandoned | TxState::Conflicted(_) => false,
TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Inactive(_) => true,
});
Expand Down Expand Up @@ -1270,19 +1286,19 @@ impl OutputCache {
fn update_token_issuance_state(
unconfirmed_descendants: &mut BTreeMap<OutPointSourceId, BTreeSet<OutPointSourceId>>,
data: &mut TokenIssuanceData,
delegation_id: &TokenId,
token_id: &TokenId,
token_nonce: AccountNonce,
tx_id: &OutPointSourceId,
) -> Result<(), WalletError> {
let next_nonce = data
.last_nonce
.map_or(Some(AccountNonce::new(0)), |nonce| nonce.increment())
.ok_or(WalletError::TokenIssuanceNonceOverflow(*delegation_id))?;
.ok_or(WalletError::TokenIssuanceNonceOverflow(*token_id))?;

ensure!(
token_nonce == next_nonce,
OutputCacheInconsistencyError::InconsistentTokenIssuanceDuplicateNonce(
*delegation_id,
*token_id,
token_nonce
)
);
Expand Down Expand Up @@ -1490,12 +1506,10 @@ impl OutputCache {
.filter_map(|tx| match tx {
WalletTx::Block(_) => None,
WalletTx::Tx(tx) => match tx.state() {
TxState::Inactive(_) | TxState::Conflicted(_) => {
TxState::Inactive(_) | TxState::Conflicted(_) | TxState::InMempool(_) => {
Some(tx.get_transaction_with_id())
}
TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => {
None
}
TxState::Confirmed(_, _, _) | TxState::Abandoned => None,
},
})
.collect()
Expand Down Expand Up @@ -1720,6 +1734,18 @@ impl OutputCache {
chain_config: &ChainConfig,
tx_id: Id<Transaction>,
) -> WalletResult<Vec<(Id<Transaction>, WalletTx)>> {
if let Some(tx) = self.txs.get(&tx_id.into()) {
let cannot_abandone = match tx.state() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. typo - "abandone".
  2. OutputCache has its own unit tests, so it's better to add a test for this change there.

TxState::Confirmed(_, _, _) | TxState::InMempool(_) | TxState::Abandoned => true,
TxState::Inactive(_) | TxState::Conflicted(_) => false,
};
if cannot_abandone {
return Err(WalletError::CannotChangeTransactionState(
tx.state(),
TxState::Abandoned,
));
}
}
let all_abandoned = self.remove_from_unconfirmed_descendants(tx_id);
let mut txs_to_rollback = vec![];

Expand Down
Loading