Skip to content

Subscribe wallet to mempool events#2005

Open
OBorce wants to merge 1 commit intomasterfrom
feature/wallet-mempool-events
Open

Subscribe wallet to mempool events#2005
OBorce wants to merge 1 commit intomasterfrom
feature/wallet-mempool-events

Conversation

@OBorce
Copy link
Contributor

@OBorce OBorce commented Jan 28, 2026

The wallet now subscribes to the mempool events when opened.

Fixed one bug with abandon tx in the cache where it would remove the tx from unconfirmed dependencies before checking if it can be abandoned, resulting in an invalid state.

@OBorce OBorce force-pushed the feature/wallet-mempool-events branch from e30e627 to 8ff637a Compare January 28, 2026 10:50
@OBorce OBorce force-pushed the feature/wallet-mempool-events branch from 8ff637a to 36cae64 Compare January 30, 2026 08:48
Copy link
Contributor

@ImplOfAnImpl ImplOfAnImpl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz mention the bugfix in the changelog

tx_id: Id<Transaction>,
) -> WalletResult<Vec<(Id<Transaction>, WalletTx)>> {
if let Some(tx) = self.txs.get(&tx_id.into()) {
let cannot_abandone = match tx.state() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. typo - "abandone".
  2. OutputCache has its own unit tests, so it's better to add a test for this change there.

Comment on lines +974 to +984
let existing_tx = self.txs.get(&tx_id);
let existing_tx_already_confirmed_or_same = existing_tx.is_some_and(|existing_tx| {
matches!(
(existing_tx.state(), tx.state()),
(TxState::Confirmed(_, _, _), _)
| (TxState::Inactive(_), TxState::Inactive(_))
| (TxState::Abandoned, TxState::Abandoned)
| (TxState::Conflicted(_), TxState::Conflicted(_))
| (TxState::InMempool(_), TxState::InMempool(_))
)
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for this?

Comment on lines +1369 to +1372
}


maybe_event = self.mempool_events.next() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra empty line

self.wallet.add_mempool_transactions(&txs, &self.wallet_events)?;
} else {
log::warn!(
"Transaction ID {} from mempool notification but not found",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broken sentence

Comment on lines +1379 to +1383
self.mempool_events = self.rpc_client
.mempool_subscribe_to_events()
.await
.map_err(ControllerError::NodeCallError)?;
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Previously, the run implementation avoided ?, so that after an error the wallet's background task would stay alive. Now you've added a bunch of ? and it's possible for it to stop working.
    If you need ?, encapsulate the whole thing in a function and then check its result as a whole.

  2. the comment

    /// Synchronize the wallet in the background from the node's blockchain.
    /// Try staking new blocks if staking was started.
    

    no longer mentions all the things this function does


async fn chainstate_info(&self) -> Result<ChainInfo, Self::Error> {
ChainstateRpcClient::info(&self.http_client)
ChainstateRpcClient::info(&*self.rpc_client.lock().await)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I can see, you always re-borrow the reference from MutexGuard as immutable. Why do you need the mutex then?


pub use p2p::{interface::types::ConnectedPeer, types::peer_id::PeerId};

pub type MempoolEvents = Box<dyn Stream<Item = MempoolNotification> + Sync + Send + Unpin>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the item is called MempoolNotification, then this is MempoolNotificationStream.
(If you prefer Event, then use it in both names)

P.S. also, the item and the stream type definitions should probably be close to each other.

* check balance in both wallets
* send coins from Acc 0 to Acc 1 without creating a block
* the second wallet should get the new Tx from mempool events
* second wallet can create a new unconfirmed Tx on top of the on in mempool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"on top of the on in mempool" - broken phrase

# check mempool has 1 transaction now
transactions = node.mempool_transactions()
assert_equal(len(transactions), 1)
transfer_tx = transactions[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

Comment on lines +22 to +25
* send coins to the wallet's address
* sync the wallet with the node
* check balance in both wallets
* send coins from Acc 0 to Acc 1 without creating a block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz put more effort into writing comments. You have 2 wallets here, it's not clear which one of them you're talking about.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants