From c04cef645a4e175426cc51748909c158910253fd Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 3 Jun 2026 16:59:18 +0200 Subject: [PATCH] fuzz: model chanmon mempool mining Route chanmon broadcasts through an explicit harness mempool so relay, mining, wallet updates, and chain delivery share one path. This lets splice, anchor, and claim transactions enter the mempool before mining. On restart, sync loaded monitors and managers from their own persisted best blocks so raw monitors catch up without rewinding ChannelManager state. Cap modeled mining before unresolved HTLC timeout deadlines and use the LDK anti-reorg depth for setup confirmations. Accept the splice-reserve disconnect a node emits when a splice would drop a balance below the channel reserve, exiting quiescence as with other splice-negotiation disconnects. AI tools were used in preparing this commit. --- fuzz/src/chanmon_consistency.rs | 683 +++++++++++++++++++++++++------- lightning/src/ln/channel.rs | 22 + 2 files changed, 560 insertions(+), 145 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 519ae515e7f..113d52f7576 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -14,9 +14,10 @@ //! To test this we stand up a network of three nodes and read bytes from the fuzz input to denote //! actions such as sending payments, handling events, or changing monitor update return values on //! a per-node basis. This should allow it to find any cases where the ordering of actions results -//! in us getting out of sync with ourselves, and, assuming at least one of our recieve- or -//! send-side handling is correct, other peers. We consider it a failure if any action results in a -//! channel being force-closed. +//! in us getting out of sync with ourselves, and, assuming at least one of our receive- or +//! send-side handling is correct, other peers. We consider it a failure if any action results in +//! a channel being force-closed. The fuzzer also models transaction relay through a harness +//! mempool, making splice confirmation and block delivery closer to normal node behavior. use bitcoin::amount::Amount; use bitcoin::constants::genesis_block; @@ -27,6 +28,7 @@ use bitcoin::script::{Builder, ScriptBuf}; use bitcoin::transaction::Version; use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::FeeRate; +use bitcoin::OutPoint as BitcoinOutPoint; use bitcoin::block::Header; use bitcoin::hash_types::Txid; @@ -41,7 +43,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::ChannelMonitor; +use lightning::chain::channelmonitor::{ChannelMonitor, ANTI_REORG_DELAY}; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -103,6 +105,17 @@ use std::sync::{Arc, Mutex}; const MAX_FEE: u32 = 10_000; const MAX_SETTLE_ITERATIONS: usize = 256; +// Each wallet is seeded with enough confirmed UTXOs that repeated splice +// transactions don't run out of inputs mid-run. +const NUM_WALLET_UTXOS: u32 = 50; +// A single fuzz byte can mine more than one block so a corpus entry does not +// need long runs of identical "mine one block" commands to reach CSV or CLTV +// boundaries. Mining commands are capped in `safe_mine_block_count` if +// unresolved HTLCs are near expiry. +const MINE_BLOCK_COUNTS: [u32; 8] = [1, 2, 3, 6, 12, 24, 48, 144]; +// Finish-time relay/mining rounds are capped so cleanup cannot spin forever. +const MAX_FINISH_RELAY_MINE_ROUNDS: usize = 32; + struct FuzzEstimator { ret_val: atomic::AtomicU32, } @@ -184,9 +197,14 @@ impl BroadcasterInterface for TestBroadcaster { struct ChainState { blocks: Vec<(Header, Vec)>, confirmed_txids: HashSet, - /// Unconfirmed transactions (e.g., splice txs). Conflicting RBF candidates may coexist; - /// `confirm_pending_txs` determines which one confirms. + /// Unconfirmed transactions admitted to the mempool, in valid block order: + /// every input is either confirmed already or created by an earlier + /// transaction in this vector. pending_txs: Vec<(Txid, Transaction)>, + /// Unspent outputs created by confirmed transactions. Mempool admission + /// checks inputs against this set, adjusted for outputs created and spent + /// by the transactions already in `pending_txs`. + utxos: HashSet, } impl ChainState { @@ -197,6 +215,7 @@ impl ChainState { blocks: vec![(genesis_header, Vec::new())], confirmed_txids: HashSet::new(), pending_txs: Vec::new(), + utxos: HashSet::new(), } } @@ -204,81 +223,223 @@ impl ChainState { (self.blocks.len() - 1) as u32 } - fn is_outpoint_spent(&self, outpoint: &bitcoin::OutPoint) -> bool { - self.blocks.iter().any(|(_, txs)| { - txs.iter().any(|tx| tx.input.iter().any(|input| input.previous_output == *outpoint)) + fn is_unspent(&self, outpoint: &BitcoinOutPoint) -> bool { + self.utxos.contains(outpoint) + } + + fn confirmed_output(&self, outpoint: &BitcoinOutPoint) -> Option<&TxOut> { + if !self.confirmed_txids.contains(&outpoint.txid) { + return None; + } + self.blocks.iter().find_map(|(_, txs)| { + txs.iter().find_map(|tx| { + if tx.compute_txid() == outpoint.txid { + tx.output.get(outpoint.vout as usize) + } else { + None + } + }) }) } - fn confirm_tx(&mut self, tx: Transaction) -> bool { - let txid = tx.compute_txid(); - if self.confirmed_txids.contains(&txid) { - return false; + // Initial channel funding is represented by a no-input transaction. It is + // not a valid Bitcoin transaction, but it gives LDK a stable funding + // outpoint without modeling coin selection during channel setup. + fn is_synthetic_funding_tx(tx: &Transaction) -> bool { + !tx.is_coinbase() && tx.input.is_empty() + } + + // Checks whether a transaction spends an input twice or spends an output + // not present in `utxos`. + fn has_invalid_inputs(tx: &Transaction, utxos: &HashSet) -> bool { + let mut spent_inputs = HashSet::new(); + for input in &tx.input { + if !spent_inputs.insert(input.previous_output) { + return true; + } + if !utxos.contains(&input.previous_output) { + return true; + } + } + false + } + + fn apply_tx_to_utxos(&mut self, txid: Txid, tx: &Transaction) { + for input in &tx.input { + self.utxos.remove(&input.previous_output); } - if tx.input.iter().any(|input| self.is_outpoint_spent(&input.previous_output)) { - return false; + for idx in 0..tx.output.len() { + self.utxos.insert(BitcoinOutPoint { txid, vout: idx as u32 }); } - self.confirmed_txids.insert(txid); + } + fn mine_block(&mut self, txs: Vec) { let prev_hash = self.blocks.last().unwrap().0.block_hash(); let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, vec![tx])); + self.blocks.push((header, txs)); + } - for _ in 0..5 { - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, Vec::new())); + fn mine_empty_blocks(&mut self, count: u32) { + for _ in 0..count { + self.mine_block(Vec::new()); } - true } - /// Add a transaction to the pending pool (mempool). Multiple conflicting transactions (RBF - /// candidates) may coexist; `confirm_pending_txs` selects which one to confirm. - fn add_pending_tx(&mut self, tx: Transaction) { - self.pending_txs.push((tx.compute_txid(), tx)); + // Mines a setup transaction directly into a block, bypassing the mempool, + // and buries it to `depth`. Wallet seeding and synthetic funding + // transactions are not relayable, so they cannot go through normal + // admission. + fn mine_setup_tx_to_depth(&mut self, tx: Transaction, depth: u32) { + assert!( + tx.is_coinbase() || Self::is_synthetic_funding_tx(&tx), + "direct setup mining is only for coinbase and synthetic funding transactions: {:?}", + tx, + ); + let txid = tx.compute_txid(); + assert!( + self.confirmed_txids.insert(txid), + "direct setup transaction was already confirmed: {:?}", + tx, + ); + self.apply_tx_to_utxos(txid, &tx); + + self.mine_block(vec![tx]); + self.mine_empty_blocks(depth.saturating_sub(1)); } - /// Confirm pending transactions in a single block, selecting deterministically among - /// conflicting RBF candidates. Sorting by txid ensures the winner is determined by fuzz input - /// content. Transactions that double-spend an already-confirmed outpoint are skipped. - fn confirm_pending_txs(&mut self) { - let mut txs = std::mem::take(&mut self.pending_txs); - txs.sort_by_key(|(txid, _)| *txid); + // Attempts to admit a broadcast transaction to the mempool, enforcing + // locktime, input, and RBF rules. Mining later confirms the whole mempool + // without further selection. + fn admit_tx_to_mempool(&mut self, tx: Transaction) { + let txid = tx.compute_txid(); + let lock_time = tx.lock_time.to_consensus_u32(); + let locktime_enabled = + tx.input.iter().any(|input| input.sequence.enables_absolute_lock_time()); - let mut confirmed = Vec::new(); - let mut spent_outpoints = Vec::new(); - for (txid, tx) in txs { - if self.confirmed_txids.contains(&txid) { - continue; - } - if tx.input.iter().any(|input| { - self.is_outpoint_spent(&input.previous_output) - || spent_outpoints.contains(&input.previous_output) - }) { - continue; + let is_ldk_commitment_obscured_locktime = + tx.input.len() == 1 && tx.input[0].sequence.0 >> 24 == 0x80 && lock_time >> 24 == 0x20; + + let immature_absolute_locktime = + locktime_enabled && tx.lock_time.is_block_height() && self.tip_height() < lock_time; + assert!( + !immature_absolute_locktime, + "broadcast immature locktime transaction into chanmon harness mempool: {:?}", + tx, + ); + + let unmodeled_time_locktime = locktime_enabled + && tx.lock_time.is_block_time() + && !is_ldk_commitment_obscured_locktime; + assert!( + !unmodeled_time_locktime, + "broadcast time-locked transaction into chanmon harness mempool: {:?}", + tx, + ); + + assert!( + !tx.is_coinbase() && !Self::is_synthetic_funding_tx(&tx), + "setup-only transaction entered chanmon harness mempool: {:?}", + tx, + ); + + if self.confirmed_txids.contains(&txid) { + return; + } + if self.pending_txs.iter().any(|(pending_txid, _)| *pending_txid == txid) { + return; + } + + // Fee-rate policy is not modeled, so among conflicting RBF candidates + // the last one relayed wins. + let mut conflicting_pending_txids = HashSet::new(); + for (pending_txid, pending_tx) in &self.pending_txs { + let signals_rbf = pending_tx.input.iter().any(|input| input.sequence.is_rbf()); + let conflicts_with_new_tx = pending_tx.input.iter().any(|pending_input| { + tx.input.iter().any(|input| input.previous_output == pending_input.previous_output) + }); + if conflicts_with_new_tx { + if !signals_rbf { + return; + } + conflicting_pending_txids.insert(*pending_txid); } - self.confirmed_txids.insert(txid); - for input in &tx.input { - spent_outpoints.push(input.previous_output); + } + if !conflicting_pending_txids.is_empty() { + let mut removed_outputs = HashSet::new(); + let mut retained_txs = Vec::new(); + for (pending_txid, pending_tx) in self.pending_txs.drain(..) { + let direct_conflict = conflicting_pending_txids.contains(&pending_txid); + let spends_removed_tx = pending_tx + .input + .iter() + .any(|input| removed_outputs.contains(&input.previous_output)); + if direct_conflict || spends_removed_tx { + for idx in 0..pending_tx.output.len() { + removed_outputs + .insert(BitcoinOutPoint { txid: pending_txid, vout: idx as u32 }); + } + } else { + retained_txs.push((pending_txid, pending_tx)); + } } - confirmed.push(tx); + self.pending_txs = retained_txs; } - if confirmed.is_empty() { + // Build the UTXO set this transaction would see if the current mempool + // confirmed. + let mut available_utxos = self.utxos.clone(); + for (pending_txid, pending_tx) in &self.pending_txs { + for input in &pending_tx.input { + available_utxos.remove(&input.previous_output); + } + for idx in 0..pending_tx.output.len() { + available_utxos.insert(BitcoinOutPoint { txid: *pending_txid, vout: idx as u32 }); + } + } + if Self::has_invalid_inputs(&tx, &available_utxos) { return; } + self.pending_txs.push((txid, tx)); + } - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, confirmed)); - - for _ in 0..5 { - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, Vec::new())); + fn relay_transactions(&mut self, txs: Vec) { + for tx in txs { + self.admit_tx_to_mempool(tx); } } + // Mines `count` blocks, confirming the current mempool in the first block. + fn mine_blocks(&mut self, count: u32) -> Vec { + assert!(count > 0, "mining zero blocks should not be requested"); + + let mempool_txs = std::mem::take(&mut self.pending_txs); + let confirmed_txs = if mempool_txs.is_empty() { + self.mine_empty_blocks(1); + Vec::new() + } else { + let mut confirmed = Vec::new(); + for (txid, tx) in mempool_txs { + assert!( + !Self::has_invalid_inputs(&tx, &self.utxos), + "mempool transaction was no longer valid at mining time: {:?}", + tx, + ); + assert!( + self.confirmed_txids.insert(txid), + "mempool transaction was already confirmed at mining time: {:?}", + tx, + ); + self.apply_tx_to_utxos(txid, &tx); + confirmed.push(tx); + } + let confirmed_txs = confirmed.clone(); + self.mine_block(confirmed); + confirmed_txs + }; + self.mine_empty_blocks(count - 1); + confirmed_txs + } + fn block_at(&self, height: u32) -> &(Header, Vec) { &self.blocks[height as usize] } @@ -775,7 +936,13 @@ fn assert_disconnect_action(action: &msgs::ErrorAction) -> (&msgs::WarningMessag // Since sending/receiving messages may be delayed, `timer_tick_occurred` may cause a node to // disconnect their counterparty if they're expecting a timely response. if let msgs::ErrorAction::DisconnectPeerWithWarning { ref msg } = action { - let is_quiescent_msg = msg.data.contains("already sent splice_locked, cannot RBF"); + // A splice can be refused mid-negotiation when the post-splice balance would + // breach the channel reserve. Like the RBF case, this leaves the channel + // quiescent, so both sides must exit quiescence afterwards. + let is_quiescent_msg = msg.data.contains("already sent splice_locked, cannot RBF") + || msg.data.contains("has a confirmed pending splice, cannot RBF") + || msg.data.contains("smaller than their selected v2 reserve") + || msg.data.contains("smaller than our selected v2 reserve"); if !msg.data.contains("Disconnecting due to timeout awaiting response") && !is_quiescent_msg { panic!("Unexpected disconnect case: {}", msg.data); @@ -817,12 +984,12 @@ struct HarnessNode<'a> { logger: Arc, broadcaster: Arc, fee_estimator: Arc, - wallet: TestWalletSource, + wallet: Arc, + wallet_sync: WalletSync, Arc>, persistence_style: ChannelMonitorUpdateStatus, deferred: bool, serialized_manager: Vec, serialized_manager_generation: u64, - height: u32, last_htlc_clear_fee: u32, } @@ -866,7 +1033,7 @@ impl<'a> HarnessNode<'a> { } fn new( - node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, + node_id: u8, wallet: Arc, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { @@ -890,6 +1057,7 @@ impl<'a> HarnessNode<'a> { &persister, deferred, ); + let wallet_sync = WalletSync::new(Arc::clone(&wallet), Arc::clone(&logger)); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; let params = ChainParameters { network, best_block: BlockLocator::from_network(network) }; @@ -917,11 +1085,11 @@ impl<'a> HarnessNode<'a> { broadcaster, fee_estimator, wallet, + wallet_sync, persistence_style, deferred, serialized_manager: Vec::new(), serialized_manager_generation: 0, - height: 0, last_htlc_clear_fee: 253, } } @@ -958,22 +1126,77 @@ impl<'a> HarnessNode<'a> { } } + fn manager_height(&self) -> u32 { + self.node.current_best_block().height + } + + // Connects a block range to ChainMonitor and ChannelManager. The start + // heights are independent because reload may pair monitors and a manager + // persisted at different chain tips. + fn connect_chain_range( + &mut self, chain_state: &ChainState, monitor_start_height: u32, manager_start_height: u32, + target_height: u32, + ) { + assert!( + target_height >= monitor_start_height, + "connect_chain_range cannot move monitor height backward ({} -> {})", + monitor_start_height, + target_height + ); + assert!( + target_height >= manager_start_height, + "connect_chain_range cannot move manager height backward ({} -> {})", + manager_start_height, + target_height + ); + let start_height = cmp::min(monitor_start_height, manager_start_height); + let mut height = start_height; + while height < target_height { + let mut next_height = height + 1; + while next_height <= target_height && chain_state.block_at(next_height).1.is_empty() { + next_height += 1; + } + if next_height > target_height { + // The rest of the range is empty. One best-block update to the + // final height is enough because LDK's Confirm API explicitly + // allows best_block_updated to skip intermediary blocks. + height = target_height; + let (header, _) = chain_state.block_at(height); + if height > monitor_start_height { + self.monitor.best_block_updated(header, height); + } + if height > manager_start_height { + self.node.best_block_updated(header, height); + } + break; + } + height = next_height; + let (header, txn) = chain_state.block_at(height); + let txdata: Vec<_> = txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect(); + if height > monitor_start_height { + self.monitor.transactions_confirmed(header, &txdata, height); + } + if height > manager_start_height { + self.node.transactions_confirmed(header, &txdata, height); + } + if height > monitor_start_height { + self.monitor.best_block_updated(header, height); + } + if height > manager_start_height { + self.node.best_block_updated(header, height); + } + } + } + fn sync_with_chain_state(&mut self, chain_state: &ChainState, num_blocks: Option) { let target_height = if let Some(num_blocks) = num_blocks { - std::cmp::min(self.height + num_blocks, chain_state.tip_height()) + std::cmp::min(self.manager_height() + num_blocks, chain_state.tip_height()) } else { chain_state.tip_height() }; - while self.height < target_height { - self.height += 1; - let (header, txn) = chain_state.block_at(self.height); - let txdata: Vec<_> = txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect(); - if !txdata.is_empty() { - self.node.transactions_confirmed(header, &txdata, self.height); - } - self.node.best_block_updated(header, self.height); - } + let start_height = self.manager_height(); + self.connect_chain_range(chain_state, start_height, start_height, target_height); } fn checkpoint_manager_persistence(&mut self) -> bool { @@ -1034,7 +1257,6 @@ impl<'a> HarnessNode<'a> { } fn splice_in(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) { - let wallet = WalletSync::new(&self.wallet, Arc::clone(&self.logger)); match self.node.splice_channel(channel_id, counterparty_node_id) { Ok(funding_template) => { let feerate = @@ -1043,7 +1265,7 @@ impl<'a> HarnessNode<'a> { Amount::from_sat(10_000), feerate, FeeRate::MAX, - &wallet, + &self.wallet_sync, ) { let _ = self.node.funding_contributed( channel_id, @@ -2125,7 +2347,7 @@ fn make_channel( tx.clone(), ) .unwrap(); - chain_state.confirm_tx(tx); + chain_state.mine_setup_tx_to_depth(tx, ANTI_REORG_DELAY); } else { panic!("Wrong event type"); } @@ -2242,24 +2464,27 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { config_byte & 0b1000_0000 != 0, ]; - let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); - let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); - let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); - let wallets = [&wallet_a, &wallet_b, &wallet_c]; - let coinbase_tx = bitcoin::Transaction { - version: bitcoin::transaction::Version::TWO, - lock_time: bitcoin::absolute::LockTime::ZERO, - input: vec![bitcoin::TxIn { ..Default::default() }], - output: wallets - .iter() - .map(|wallet| TxOut { - value: Amount::from_sat(100_000), - script_pubkey: wallet.get_change_script().unwrap(), - }) - .collect(), - }; - for (idx, wallet) in wallets.iter().enumerate() { - wallet.add_utxo(coinbase_tx.clone(), idx as u32); + let wallet_a = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap())); + let wallet_b = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap())); + let wallet_c = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap())); + let wallets = [wallet_a.as_ref(), wallet_b.as_ref(), wallet_c.as_ref()]; + let mut chain_state = ChainState::new(); + for wallet in wallets { + let coinbase_tx = bitcoin::Transaction { + version: bitcoin::transaction::Version::TWO, + lock_time: bitcoin::absolute::LockTime::ZERO, + input: vec![bitcoin::TxIn { ..Default::default() }], + output: (0..NUM_WALLET_UTXOS) + .map(|_| TxOut { + value: Amount::from_sat(100_000), + script_pubkey: wallet.get_change_script().unwrap(), + }) + .collect(), + }; + for vout in 0..NUM_WALLET_UTXOS { + wallet.add_utxo(coinbase_tx.clone(), vout); + } + chain_state.mine_setup_tx_to_depth(coinbase_tx, ANTI_REORG_DELAY); } let fee_est_a = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }); @@ -2274,7 +2499,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { let mut nodes = [ HarnessNode::new( 0, - wallet_a, + Arc::clone(&wallet_a), Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], @@ -2285,7 +2510,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ), HarnessNode::new( 1, - wallet_b, + Arc::clone(&wallet_b), Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], @@ -2296,7 +2521,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ), HarnessNode::new( 2, - wallet_c, + Arc::clone(&wallet_c), Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], @@ -2306,8 +2531,6 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { chan_type, ), ]; - let mut chain_state = ChainState::new(); - // Connect peers first, then create channels. connect_peers(&nodes[0], &nodes[1]); connect_peers(&nodes[1], &nodes[2]); @@ -2376,7 +2599,34 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.bc_link.first_channel_id() } - fn finish(&self) { + // Runs end-of-input cleanup by relaying and mining remaining broadcasts. + // Final invariants should not depend on the input ending with explicit relay + // and mining bytes. + fn finish(&mut self) { + for _ in 0..MAX_FINISH_RELAY_MINE_ROUNDS { + let mut txs = Vec::new(); + for node in &self.nodes { + txs.extend(node.broadcaster.txn_broadcasted.borrow_mut().drain(..)); + } + self.chain_state.relay_transactions(txs); + if self.chain_state.pending_txs.is_empty() { + assert_test_invariants(&self.nodes); + return; + } + if self.mine_blocks(ANTI_REORG_DELAY) == 0 { + // The input ended with pending mempool transactions but no safe + // block left before an HTLC fail-back window. Leave them + // unconfirmed rather than forcing finish cleanup to advance + // the chain past that boundary. + assert_test_invariants(&self.nodes); + return; + } + } + assert!( + !self.nodes.iter().any(|node| !node.broadcaster.txn_broadcasted.borrow().is_empty()) + && self.chain_state.pending_txs.is_empty(), + "finish tx mining loop failed to quiesce", + ); assert_test_invariants(&self.nodes); } @@ -2775,8 +3025,8 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { fn process_events(&mut self, node_idx: usize, fail: bool) -> bool { let nodes = &self.nodes; - let chain_state = &mut self.chain_state; let payments = &mut self.payments; + let chain_state = &self.chain_state; // Multiple HTLCs can resolve for the same payment hash, so deduplicate // claim/fail handling per event batch. let mut claim_set = new_hash_map(); @@ -2814,29 +3064,53 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { unsigned_transaction, .. } => { - let signed_tx = nodes[node_idx].wallet.sign_tx(unsigned_transaction).unwrap(); - match nodes[node_idx].funding_transaction_signed( - &channel_id, - &counterparty_node_id, - signed_tx, - ) { - Ok(()) => {}, - Err(APIError::APIMisuseError { ref err }) - if err.contains("not expecting funding signatures") => + let wallet_script = nodes[node_idx].wallet.get_change_script().unwrap(); + let has_unknown_spent_input = unsigned_transaction.input.iter().any(|input| { + !chain_state.is_unspent(&input.previous_output) + && chain_state.confirmed_output(&input.previous_output).is_none() + }); + assert!( + !has_unknown_spent_input, + "funding transaction referenced an unmodeled input: {:?}", + unsigned_transaction, + ); + let has_spent_wallet_input = unsigned_transaction.input.iter().any(|input| { + !chain_state.is_unspent(&input.previous_output) + && chain_state + .confirmed_output(&input.previous_output) + .map_or(false, |output| output.script_pubkey == wallet_script) + }); + if has_spent_wallet_input { + // A queued RBF signing request can lose the race against a + // transaction confirming with one of its wallet inputs. + match nodes[node_idx] + .cancel_funding_contributed(&channel_id, &counterparty_node_id) { - // A queued signing event can be invalidated by a later `tx_abort` - // before the application handles it. - }, - Err(e) => panic!("{e:?}"), + Ok(()) => {}, + Err(APIError::APIMisuseError { ref err }) + if err.contains("does not have a pending splice negotiation") => {}, + Err(e) => panic!("{e:?}"), + } + } else { + let signed_tx = + nodes[node_idx].wallet.sign_tx(unsigned_transaction).unwrap(); + match nodes[node_idx].funding_transaction_signed( + &channel_id, + &counterparty_node_id, + signed_tx, + ) { + Ok(()) => {}, + Err(APIError::APIMisuseError { ref err }) + if err.contains("not expecting funding signatures") => + { + // A queued signing event can be invalidated by a later `tx_abort` + // before the application handles it. + }, + Err(e) => panic!("{e:?}"), + } } }, - events::Event::SpliceNegotiated { new_funding_txo, .. } => { - let mut txs = nodes[node_idx].broadcaster.txn_broadcasted.borrow_mut(); - assert!(txs.len() >= 1); - let splice_tx = txs.remove(0); - assert_eq!(new_funding_txo.txid, splice_tx.compute_txid()); - chain_state.add_pending_tx(splice_tx); - }, + events::Event::SpliceNegotiated { .. } => {}, events::Event::SpliceNegotiationFailed { .. } => {}, events::Event::DiscardFunding { funding_info: @@ -2935,6 +3209,20 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.bc_link.reconnect(&self.nodes); } + // Finds the earliest loaded monitor height for a node. Startup sync uses it + // as ChainMonitor's start height so raw monitors loaded below the manager's + // best block still see every block and transaction they missed. + fn oldest_monitor_height_for_node(&self, node_idx: usize) -> u32 { + let node = &self.nodes[node_idx]; + let mut min_monitor_height = node.manager_height(); + for chan_id in node.monitor.list_monitors() { + if let Ok(mon) = node.monitor.get_monitor(chan_id) { + min_monitor_height = cmp::min(min_monitor_height, mon.current_best_block().height); + } + } + min_monitor_height + } + fn restart_node(&mut self, node_idx: usize, v: u8, router: &'a FuzzRouter) { if !self.nodes[node_idx].deferred { self.nodes[node_idx].checkpoint_manager_persistence(); @@ -2954,6 +3242,21 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } let loaded_manager_generation = self.nodes[node_idx].reload(v, &self.out, router, self.chan_type); + let monitor_start_height = self.oldest_monitor_height_for_node(node_idx); + let manager_start_height = self.nodes[node_idx].manager_height(); + // Startup sync is part of LDK's deserialization contract. + self.nodes[node_idx].connect_chain_range( + &self.chain_state, + monitor_start_height, + manager_start_height, + self.chain_state.tip_height(), + ); + assert_eq!( + self.nodes[node_idx].manager_height(), + self.chain_state.tip_height(), + "reloaded node {} must sync to the harness tip before normal operation resumes", + node_idx + ); let rolled_back_payment_hashes = self.payments.nodes[node_idx] .sync_pending_with_manager_generation(loaded_manager_generation); for payment_hash in rolled_back_payment_hashes { @@ -2962,6 +3265,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } fn settle_all(&mut self) { + let chain_state = &self.chain_state; + for node in &mut self.nodes { + node.sync_with_chain_state(chain_state, None); + } + // First, make sure peers are all connected to each other self.reconnect_ab(); self.reconnect_bc(); @@ -3036,6 +3344,102 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } made_progress } + + // Relays one node's broadcasts into the mempool. Per-node relay lets fuzz + // inputs model partial propagation before a block is mined. + fn relay_broadcasts_for_node(&mut self, node_idx: usize) { + let txs = self.nodes[node_idx] + .broadcaster + .txn_broadcasted + .borrow_mut() + .drain(..) + .collect::>(); + self.chain_state.relay_transactions(txs); + } + + fn earliest_pending_htlc_expiry(&self) -> Option { + let mut earliest_expiry: Option = None; + for node in &self.nodes { + for chan in node.list_channels() { + for htlc in &chan.pending_inbound_htlcs { + earliest_expiry = Some( + earliest_expiry + .map_or(htlc.cltv_expiry, |expiry| expiry.min(htlc.cltv_expiry)), + ); + } + for htlc in &chan.pending_outbound_htlcs { + earliest_expiry = Some( + earliest_expiry + .map_or(htlc.cltv_expiry, |expiry| expiry.min(htlc.cltv_expiry)), + ); + } + } + } + earliest_expiry + } + + fn safe_mine_block_count(&self, count: u32) -> u32 { + if let Some(expiry) = self.earliest_pending_htlc_expiry() { + let current_tip = self.chain_state.tip_height(); + // LDK may close to protect a pending HTLC before its raw CLTV + // expiry. Keep mining outside that fail-back window so fuzzed block + // production does not force an on-chain timeout path. + let timeout_deadline = expiry.saturating_sub(channelmonitor::HTLC_FAIL_BACK_BUFFER); + assert!( + current_tip < timeout_deadline, + "pending HTLC with expiry {} and timeout deadline {} is already unsafe at tip {}", + expiry, + timeout_deadline, + current_tip + ); + // Stop before the deadline block itself, since connecting it is + // enough for ChannelMonitor timeout handling to run. + count.min(timeout_deadline - current_tip - 1) + } else { + count + } + } + + // Mines blocks through ChainState, then applies confirmed transactions to + // the wallets and syncs node chain listeners. + fn mine_blocks(&mut self, count: u32) -> u32 { + assert!(count > 0, "mining zero blocks should not be requested"); + + let count = self.safe_mine_block_count(count); + if count == 0 { + return 0; + } + let confirmed_txs = self.chain_state.mine_blocks(count); + let wallets = [ + self.nodes[0].wallet.as_ref(), + self.nodes[1].wallet.as_ref(), + self.nodes[2].wallet.as_ref(), + ]; + for tx in &confirmed_txs { + for wallet in wallets.iter().copied() { + let change_script = wallet.get_change_script().unwrap(); + for input in &tx.input { + // The test wallet is a simple UTXO source. When one of its + // outputs is spent by a confirmed transaction, remove it so + // later funding attempts cannot double-spend it. + wallet.remove_utxo(input.previous_output); + } + for (vout, output) in tx.output.iter().enumerate() { + if output.script_pubkey == change_script { + // Add outputs to whichever test wallet owns the script. + // This lets splice flows recycle wallet change through + // later fuzz commands. + wallet.add_utxo(tx.clone(), vout as u32); + } + } + } + } + let chain_state = &self.chain_state; + for node in &mut self.nodes { + node.sync_with_chain_state(chain_state, None); + } + count + } } #[inline] @@ -3245,32 +3649,14 @@ pub fn do_test(data: &[u8], out: Out) { harness.nodes[2].splice_out(&cp_node_id, &harness.chan_b_id()); }, - // Sync node by 1 block to cover confirmation of a transaction. - 0xa8 => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[0].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - 0xa9 => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[1].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - 0xaa => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[2].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - // Sync node to chain tip to cover confirmation of a transaction post-reorg-risk. - 0xab => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[0].sync_with_chain_state(&harness.chain_state, None); - }, - 0xac => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[1].sync_with_chain_state(&harness.chain_state, None); - }, - 0xad => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[2].sync_with_chain_state(&harness.chain_state, None); - }, + // Sync node by 1 block. + 0xa8 => harness.nodes[0].sync_with_chain_state(&harness.chain_state, Some(1)), + 0xa9 => harness.nodes[1].sync_with_chain_state(&harness.chain_state, Some(1)), + 0xaa => harness.nodes[2].sync_with_chain_state(&harness.chain_state, Some(1)), + // Sync node to chain tip. + 0xab => harness.nodes[0].sync_with_chain_state(&harness.chain_state, None), + 0xac => harness.nodes[1].sync_with_chain_state(&harness.chain_state, None), + 0xad => harness.nodes[2].sync_with_chain_state(&harness.chain_state, None), 0xb0 | 0xb1 | 0xb2 => { // Restart node A, picking among persisted and in-flight `ChannelMonitor` @@ -3395,6 +3781,13 @@ pub fn do_test(data: &[u8], out: Out) { .enable_op_for_all_signers(SignerOp::SignSpliceSharedInput); harness.nodes[2].signer_unblocked(None); }, + 0xd6 => harness.relay_broadcasts_for_node(0), + 0xd7 => harness.relay_broadcasts_for_node(1), + 0xd8 => harness.relay_broadcasts_for_node(2), + 0xd9..=0xe0 => { + let count = MINE_BLOCK_COUNTS[(v - 0xd9) as usize]; + harness.mine_blocks(count); + }, 0xf0 => harness.ab_link.complete_monitor_updates_for_node( 0, diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 3df6f5fc436..463c2e8e8f6 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -12786,6 +12786,17 @@ where )); } + if pending_splice + .negotiated_candidates + .iter() + .any(|funding| funding.funding_tx_confirmation_height != 0) + { + return Err(format!( + "Channel {} has a confirmed pending splice, cannot RBF", + self.context.channel_id(), + )); + } + if pending_splice.negotiated_candidates.is_empty() { return Err(format!( "Channel {} has no negotiated splice candidates to RBF", @@ -13438,6 +13449,17 @@ where ))); } + if pending_splice + .negotiated_candidates + .iter() + .any(|funding| funding.funding_tx_confirmation_height != 0) + { + return Err(ChannelError::WarnAndDisconnect(format!( + "Channel {} has a confirmed pending splice, cannot RBF", + self.context.channel_id(), + ))); + } + let last_candidate = match pending_splice.negotiated_candidates.last() { Some(candidate) => candidate, None => {