diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 519ae515e7f..113d52f7576 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -14,9 +14,10 @@ //! To test this we stand up a network of three nodes and read bytes from the fuzz input to denote //! actions such as sending payments, handling events, or changing monitor update return values on //! a per-node basis. This should allow it to find any cases where the ordering of actions results -//! in us getting out of sync with ourselves, and, assuming at least one of our recieve- or -//! send-side handling is correct, other peers. We consider it a failure if any action results in a -//! channel being force-closed. +//! in us getting out of sync with ourselves, and, assuming at least one of our receive- or +//! send-side handling is correct, other peers. We consider it a failure if any action results in +//! a channel being force-closed. The fuzzer also models transaction relay through a harness +//! mempool, making splice confirmation and block delivery closer to normal node behavior. use bitcoin::amount::Amount; use bitcoin::constants::genesis_block; @@ -27,6 +28,7 @@ use bitcoin::script::{Builder, ScriptBuf}; use bitcoin::transaction::Version; use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::FeeRate; +use bitcoin::OutPoint as BitcoinOutPoint; use bitcoin::block::Header; use bitcoin::hash_types::Txid; @@ -41,7 +43,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::ChannelMonitor; +use lightning::chain::channelmonitor::{ChannelMonitor, ANTI_REORG_DELAY}; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -103,6 +105,17 @@ use std::sync::{Arc, Mutex}; const MAX_FEE: u32 = 10_000; const MAX_SETTLE_ITERATIONS: usize = 256; +// Each wallet is seeded with enough confirmed UTXOs that repeated splice +// transactions don't run out of inputs mid-run. +const NUM_WALLET_UTXOS: u32 = 50; +// A single fuzz byte can mine more than one block so a corpus entry does not +// need long runs of identical "mine one block" commands to reach CSV or CLTV +// boundaries. Mining commands are capped in `safe_mine_block_count` if +// unresolved HTLCs are near expiry. +const MINE_BLOCK_COUNTS: [u32; 8] = [1, 2, 3, 6, 12, 24, 48, 144]; +// Finish-time relay/mining rounds are capped so cleanup cannot spin forever. +const MAX_FINISH_RELAY_MINE_ROUNDS: usize = 32; + struct FuzzEstimator { ret_val: atomic::AtomicU32, } @@ -184,9 +197,14 @@ impl BroadcasterInterface for TestBroadcaster { struct ChainState { blocks: Vec<(Header, Vec)>, confirmed_txids: HashSet, - /// Unconfirmed transactions (e.g., splice txs). Conflicting RBF candidates may coexist; - /// `confirm_pending_txs` determines which one confirms. + /// Unconfirmed transactions admitted to the mempool, in valid block order: + /// every input is either confirmed already or created by an earlier + /// transaction in this vector. pending_txs: Vec<(Txid, Transaction)>, + /// Unspent outputs created by confirmed transactions. Mempool admission + /// checks inputs against this set, adjusted for outputs created and spent + /// by the transactions already in `pending_txs`. + utxos: HashSet, } impl ChainState { @@ -197,6 +215,7 @@ impl ChainState { blocks: vec![(genesis_header, Vec::new())], confirmed_txids: HashSet::new(), pending_txs: Vec::new(), + utxos: HashSet::new(), } } @@ -204,81 +223,223 @@ impl ChainState { (self.blocks.len() - 1) as u32 } - fn is_outpoint_spent(&self, outpoint: &bitcoin::OutPoint) -> bool { - self.blocks.iter().any(|(_, txs)| { - txs.iter().any(|tx| tx.input.iter().any(|input| input.previous_output == *outpoint)) + fn is_unspent(&self, outpoint: &BitcoinOutPoint) -> bool { + self.utxos.contains(outpoint) + } + + fn confirmed_output(&self, outpoint: &BitcoinOutPoint) -> Option<&TxOut> { + if !self.confirmed_txids.contains(&outpoint.txid) { + return None; + } + self.blocks.iter().find_map(|(_, txs)| { + txs.iter().find_map(|tx| { + if tx.compute_txid() == outpoint.txid { + tx.output.get(outpoint.vout as usize) + } else { + None + } + }) }) } - fn confirm_tx(&mut self, tx: Transaction) -> bool { - let txid = tx.compute_txid(); - if self.confirmed_txids.contains(&txid) { - return false; + // Initial channel funding is represented by a no-input transaction. It is + // not a valid Bitcoin transaction, but it gives LDK a stable funding + // outpoint without modeling coin selection during channel setup. + fn is_synthetic_funding_tx(tx: &Transaction) -> bool { + !tx.is_coinbase() && tx.input.is_empty() + } + + // Checks whether a transaction spends an input twice or spends an output + // not present in `utxos`. + fn has_invalid_inputs(tx: &Transaction, utxos: &HashSet) -> bool { + let mut spent_inputs = HashSet::new(); + for input in &tx.input { + if !spent_inputs.insert(input.previous_output) { + return true; + } + if !utxos.contains(&input.previous_output) { + return true; + } + } + false + } + + fn apply_tx_to_utxos(&mut self, txid: Txid, tx: &Transaction) { + for input in &tx.input { + self.utxos.remove(&input.previous_output); } - if tx.input.iter().any(|input| self.is_outpoint_spent(&input.previous_output)) { - return false; + for idx in 0..tx.output.len() { + self.utxos.insert(BitcoinOutPoint { txid, vout: idx as u32 }); } - self.confirmed_txids.insert(txid); + } + fn mine_block(&mut self, txs: Vec) { let prev_hash = self.blocks.last().unwrap().0.block_hash(); let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, vec![tx])); + self.blocks.push((header, txs)); + } - for _ in 0..5 { - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, Vec::new())); + fn mine_empty_blocks(&mut self, count: u32) { + for _ in 0..count { + self.mine_block(Vec::new()); } - true } - /// Add a transaction to the pending pool (mempool). Multiple conflicting transactions (RBF - /// candidates) may coexist; `confirm_pending_txs` selects which one to confirm. - fn add_pending_tx(&mut self, tx: Transaction) { - self.pending_txs.push((tx.compute_txid(), tx)); + // Mines a setup transaction directly into a block, bypassing the mempool, + // and buries it to `depth`. Wallet seeding and synthetic funding + // transactions are not relayable, so they cannot go through normal + // admission. + fn mine_setup_tx_to_depth(&mut self, tx: Transaction, depth: u32) { + assert!( + tx.is_coinbase() || Self::is_synthetic_funding_tx(&tx), + "direct setup mining is only for coinbase and synthetic funding transactions: {:?}", + tx, + ); + let txid = tx.compute_txid(); + assert!( + self.confirmed_txids.insert(txid), + "direct setup transaction was already confirmed: {:?}", + tx, + ); + self.apply_tx_to_utxos(txid, &tx); + + self.mine_block(vec![tx]); + self.mine_empty_blocks(depth.saturating_sub(1)); } - /// Confirm pending transactions in a single block, selecting deterministically among - /// conflicting RBF candidates. Sorting by txid ensures the winner is determined by fuzz input - /// content. Transactions that double-spend an already-confirmed outpoint are skipped. - fn confirm_pending_txs(&mut self) { - let mut txs = std::mem::take(&mut self.pending_txs); - txs.sort_by_key(|(txid, _)| *txid); + // Attempts to admit a broadcast transaction to the mempool, enforcing + // locktime, input, and RBF rules. Mining later confirms the whole mempool + // without further selection. + fn admit_tx_to_mempool(&mut self, tx: Transaction) { + let txid = tx.compute_txid(); + let lock_time = tx.lock_time.to_consensus_u32(); + let locktime_enabled = + tx.input.iter().any(|input| input.sequence.enables_absolute_lock_time()); - let mut confirmed = Vec::new(); - let mut spent_outpoints = Vec::new(); - for (txid, tx) in txs { - if self.confirmed_txids.contains(&txid) { - continue; - } - if tx.input.iter().any(|input| { - self.is_outpoint_spent(&input.previous_output) - || spent_outpoints.contains(&input.previous_output) - }) { - continue; + let is_ldk_commitment_obscured_locktime = + tx.input.len() == 1 && tx.input[0].sequence.0 >> 24 == 0x80 && lock_time >> 24 == 0x20; + + let immature_absolute_locktime = + locktime_enabled && tx.lock_time.is_block_height() && self.tip_height() < lock_time; + assert!( + !immature_absolute_locktime, + "broadcast immature locktime transaction into chanmon harness mempool: {:?}", + tx, + ); + + let unmodeled_time_locktime = locktime_enabled + && tx.lock_time.is_block_time() + && !is_ldk_commitment_obscured_locktime; + assert!( + !unmodeled_time_locktime, + "broadcast time-locked transaction into chanmon harness mempool: {:?}", + tx, + ); + + assert!( + !tx.is_coinbase() && !Self::is_synthetic_funding_tx(&tx), + "setup-only transaction entered chanmon harness mempool: {:?}", + tx, + ); + + if self.confirmed_txids.contains(&txid) { + return; + } + if self.pending_txs.iter().any(|(pending_txid, _)| *pending_txid == txid) { + return; + } + + // Fee-rate policy is not modeled, so among conflicting RBF candidates + // the last one relayed wins. + let mut conflicting_pending_txids = HashSet::new(); + for (pending_txid, pending_tx) in &self.pending_txs { + let signals_rbf = pending_tx.input.iter().any(|input| input.sequence.is_rbf()); + let conflicts_with_new_tx = pending_tx.input.iter().any(|pending_input| { + tx.input.iter().any(|input| input.previous_output == pending_input.previous_output) + }); + if conflicts_with_new_tx { + if !signals_rbf { + return; + } + conflicting_pending_txids.insert(*pending_txid); } - self.confirmed_txids.insert(txid); - for input in &tx.input { - spent_outpoints.push(input.previous_output); + } + if !conflicting_pending_txids.is_empty() { + let mut removed_outputs = HashSet::new(); + let mut retained_txs = Vec::new(); + for (pending_txid, pending_tx) in self.pending_txs.drain(..) { + let direct_conflict = conflicting_pending_txids.contains(&pending_txid); + let spends_removed_tx = pending_tx + .input + .iter() + .any(|input| removed_outputs.contains(&input.previous_output)); + if direct_conflict || spends_removed_tx { + for idx in 0..pending_tx.output.len() { + removed_outputs + .insert(BitcoinOutPoint { txid: pending_txid, vout: idx as u32 }); + } + } else { + retained_txs.push((pending_txid, pending_tx)); + } } - confirmed.push(tx); + self.pending_txs = retained_txs; } - if confirmed.is_empty() { + // Build the UTXO set this transaction would see if the current mempool + // confirmed. + let mut available_utxos = self.utxos.clone(); + for (pending_txid, pending_tx) in &self.pending_txs { + for input in &pending_tx.input { + available_utxos.remove(&input.previous_output); + } + for idx in 0..pending_tx.output.len() { + available_utxos.insert(BitcoinOutPoint { txid: *pending_txid, vout: idx as u32 }); + } + } + if Self::has_invalid_inputs(&tx, &available_utxos) { return; } + self.pending_txs.push((txid, tx)); + } - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, confirmed)); - - for _ in 0..5 { - let prev_hash = self.blocks.last().unwrap().0.block_hash(); - let header = create_dummy_header(prev_hash, 42); - self.blocks.push((header, Vec::new())); + fn relay_transactions(&mut self, txs: Vec) { + for tx in txs { + self.admit_tx_to_mempool(tx); } } + // Mines `count` blocks, confirming the current mempool in the first block. + fn mine_blocks(&mut self, count: u32) -> Vec { + assert!(count > 0, "mining zero blocks should not be requested"); + + let mempool_txs = std::mem::take(&mut self.pending_txs); + let confirmed_txs = if mempool_txs.is_empty() { + self.mine_empty_blocks(1); + Vec::new() + } else { + let mut confirmed = Vec::new(); + for (txid, tx) in mempool_txs { + assert!( + !Self::has_invalid_inputs(&tx, &self.utxos), + "mempool transaction was no longer valid at mining time: {:?}", + tx, + ); + assert!( + self.confirmed_txids.insert(txid), + "mempool transaction was already confirmed at mining time: {:?}", + tx, + ); + self.apply_tx_to_utxos(txid, &tx); + confirmed.push(tx); + } + let confirmed_txs = confirmed.clone(); + self.mine_block(confirmed); + confirmed_txs + }; + self.mine_empty_blocks(count - 1); + confirmed_txs + } + fn block_at(&self, height: u32) -> &(Header, Vec) { &self.blocks[height as usize] } @@ -775,7 +936,13 @@ fn assert_disconnect_action(action: &msgs::ErrorAction) -> (&msgs::WarningMessag // Since sending/receiving messages may be delayed, `timer_tick_occurred` may cause a node to // disconnect their counterparty if they're expecting a timely response. if let msgs::ErrorAction::DisconnectPeerWithWarning { ref msg } = action { - let is_quiescent_msg = msg.data.contains("already sent splice_locked, cannot RBF"); + // A splice can be refused mid-negotiation when the post-splice balance would + // breach the channel reserve. Like the RBF case, this leaves the channel + // quiescent, so both sides must exit quiescence afterwards. + let is_quiescent_msg = msg.data.contains("already sent splice_locked, cannot RBF") + || msg.data.contains("has a confirmed pending splice, cannot RBF") + || msg.data.contains("smaller than their selected v2 reserve") + || msg.data.contains("smaller than our selected v2 reserve"); if !msg.data.contains("Disconnecting due to timeout awaiting response") && !is_quiescent_msg { panic!("Unexpected disconnect case: {}", msg.data); @@ -817,12 +984,12 @@ struct HarnessNode<'a> { logger: Arc, broadcaster: Arc, fee_estimator: Arc, - wallet: TestWalletSource, + wallet: Arc, + wallet_sync: WalletSync, Arc>, persistence_style: ChannelMonitorUpdateStatus, deferred: bool, serialized_manager: Vec, serialized_manager_generation: u64, - height: u32, last_htlc_clear_fee: u32, } @@ -866,7 +1033,7 @@ impl<'a> HarnessNode<'a> { } fn new( - node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, + node_id: u8, wallet: Arc, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { @@ -890,6 +1057,7 @@ impl<'a> HarnessNode<'a> { &persister, deferred, ); + let wallet_sync = WalletSync::new(Arc::clone(&wallet), Arc::clone(&logger)); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; let params = ChainParameters { network, best_block: BlockLocator::from_network(network) }; @@ -917,11 +1085,11 @@ impl<'a> HarnessNode<'a> { broadcaster, fee_estimator, wallet, + wallet_sync, persistence_style, deferred, serialized_manager: Vec::new(), serialized_manager_generation: 0, - height: 0, last_htlc_clear_fee: 253, } } @@ -958,22 +1126,77 @@ impl<'a> HarnessNode<'a> { } } + fn manager_height(&self) -> u32 { + self.node.current_best_block().height + } + + // Connects a block range to ChainMonitor and ChannelManager. The start + // heights are independent because reload may pair monitors and a manager + // persisted at different chain tips. + fn connect_chain_range( + &mut self, chain_state: &ChainState, monitor_start_height: u32, manager_start_height: u32, + target_height: u32, + ) { + assert!( + target_height >= monitor_start_height, + "connect_chain_range cannot move monitor height backward ({} -> {})", + monitor_start_height, + target_height + ); + assert!( + target_height >= manager_start_height, + "connect_chain_range cannot move manager height backward ({} -> {})", + manager_start_height, + target_height + ); + let start_height = cmp::min(monitor_start_height, manager_start_height); + let mut height = start_height; + while height < target_height { + let mut next_height = height + 1; + while next_height <= target_height && chain_state.block_at(next_height).1.is_empty() { + next_height += 1; + } + if next_height > target_height { + // The rest of the range is empty. One best-block update to the + // final height is enough because LDK's Confirm API explicitly + // allows best_block_updated to skip intermediary blocks. + height = target_height; + let (header, _) = chain_state.block_at(height); + if height > monitor_start_height { + self.monitor.best_block_updated(header, height); + } + if height > manager_start_height { + self.node.best_block_updated(header, height); + } + break; + } + height = next_height; + let (header, txn) = chain_state.block_at(height); + let txdata: Vec<_> = txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect(); + if height > monitor_start_height { + self.monitor.transactions_confirmed(header, &txdata, height); + } + if height > manager_start_height { + self.node.transactions_confirmed(header, &txdata, height); + } + if height > monitor_start_height { + self.monitor.best_block_updated(header, height); + } + if height > manager_start_height { + self.node.best_block_updated(header, height); + } + } + } + fn sync_with_chain_state(&mut self, chain_state: &ChainState, num_blocks: Option) { let target_height = if let Some(num_blocks) = num_blocks { - std::cmp::min(self.height + num_blocks, chain_state.tip_height()) + std::cmp::min(self.manager_height() + num_blocks, chain_state.tip_height()) } else { chain_state.tip_height() }; - while self.height < target_height { - self.height += 1; - let (header, txn) = chain_state.block_at(self.height); - let txdata: Vec<_> = txn.iter().enumerate().map(|(i, tx)| (i + 1, tx)).collect(); - if !txdata.is_empty() { - self.node.transactions_confirmed(header, &txdata, self.height); - } - self.node.best_block_updated(header, self.height); - } + let start_height = self.manager_height(); + self.connect_chain_range(chain_state, start_height, start_height, target_height); } fn checkpoint_manager_persistence(&mut self) -> bool { @@ -1034,7 +1257,6 @@ impl<'a> HarnessNode<'a> { } fn splice_in(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) { - let wallet = WalletSync::new(&self.wallet, Arc::clone(&self.logger)); match self.node.splice_channel(channel_id, counterparty_node_id) { Ok(funding_template) => { let feerate = @@ -1043,7 +1265,7 @@ impl<'a> HarnessNode<'a> { Amount::from_sat(10_000), feerate, FeeRate::MAX, - &wallet, + &self.wallet_sync, ) { let _ = self.node.funding_contributed( channel_id, @@ -2125,7 +2347,7 @@ fn make_channel( tx.clone(), ) .unwrap(); - chain_state.confirm_tx(tx); + chain_state.mine_setup_tx_to_depth(tx, ANTI_REORG_DELAY); } else { panic!("Wrong event type"); } @@ -2242,24 +2464,27 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { config_byte & 0b1000_0000 != 0, ]; - let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); - let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); - let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); - let wallets = [&wallet_a, &wallet_b, &wallet_c]; - let coinbase_tx = bitcoin::Transaction { - version: bitcoin::transaction::Version::TWO, - lock_time: bitcoin::absolute::LockTime::ZERO, - input: vec![bitcoin::TxIn { ..Default::default() }], - output: wallets - .iter() - .map(|wallet| TxOut { - value: Amount::from_sat(100_000), - script_pubkey: wallet.get_change_script().unwrap(), - }) - .collect(), - }; - for (idx, wallet) in wallets.iter().enumerate() { - wallet.add_utxo(coinbase_tx.clone(), idx as u32); + let wallet_a = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap())); + let wallet_b = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap())); + let wallet_c = Arc::new(TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap())); + let wallets = [wallet_a.as_ref(), wallet_b.as_ref(), wallet_c.as_ref()]; + let mut chain_state = ChainState::new(); + for wallet in wallets { + let coinbase_tx = bitcoin::Transaction { + version: bitcoin::transaction::Version::TWO, + lock_time: bitcoin::absolute::LockTime::ZERO, + input: vec![bitcoin::TxIn { ..Default::default() }], + output: (0..NUM_WALLET_UTXOS) + .map(|_| TxOut { + value: Amount::from_sat(100_000), + script_pubkey: wallet.get_change_script().unwrap(), + }) + .collect(), + }; + for vout in 0..NUM_WALLET_UTXOS { + wallet.add_utxo(coinbase_tx.clone(), vout); + } + chain_state.mine_setup_tx_to_depth(coinbase_tx, ANTI_REORG_DELAY); } let fee_est_a = Arc::new(FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }); @@ -2274,7 +2499,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { let mut nodes = [ HarnessNode::new( 0, - wallet_a, + Arc::clone(&wallet_a), Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], @@ -2285,7 +2510,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ), HarnessNode::new( 1, - wallet_b, + Arc::clone(&wallet_b), Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], @@ -2296,7 +2521,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ), HarnessNode::new( 2, - wallet_c, + Arc::clone(&wallet_c), Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], @@ -2306,8 +2531,6 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { chan_type, ), ]; - let mut chain_state = ChainState::new(); - // Connect peers first, then create channels. connect_peers(&nodes[0], &nodes[1]); connect_peers(&nodes[1], &nodes[2]); @@ -2376,7 +2599,34 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.bc_link.first_channel_id() } - fn finish(&self) { + // Runs end-of-input cleanup by relaying and mining remaining broadcasts. + // Final invariants should not depend on the input ending with explicit relay + // and mining bytes. + fn finish(&mut self) { + for _ in 0..MAX_FINISH_RELAY_MINE_ROUNDS { + let mut txs = Vec::new(); + for node in &self.nodes { + txs.extend(node.broadcaster.txn_broadcasted.borrow_mut().drain(..)); + } + self.chain_state.relay_transactions(txs); + if self.chain_state.pending_txs.is_empty() { + assert_test_invariants(&self.nodes); + return; + } + if self.mine_blocks(ANTI_REORG_DELAY) == 0 { + // The input ended with pending mempool transactions but no safe + // block left before an HTLC fail-back window. Leave them + // unconfirmed rather than forcing finish cleanup to advance + // the chain past that boundary. + assert_test_invariants(&self.nodes); + return; + } + } + assert!( + !self.nodes.iter().any(|node| !node.broadcaster.txn_broadcasted.borrow().is_empty()) + && self.chain_state.pending_txs.is_empty(), + "finish tx mining loop failed to quiesce", + ); assert_test_invariants(&self.nodes); } @@ -2775,8 +3025,8 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { fn process_events(&mut self, node_idx: usize, fail: bool) -> bool { let nodes = &self.nodes; - let chain_state = &mut self.chain_state; let payments = &mut self.payments; + let chain_state = &self.chain_state; // Multiple HTLCs can resolve for the same payment hash, so deduplicate // claim/fail handling per event batch. let mut claim_set = new_hash_map(); @@ -2814,29 +3064,53 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { unsigned_transaction, .. } => { - let signed_tx = nodes[node_idx].wallet.sign_tx(unsigned_transaction).unwrap(); - match nodes[node_idx].funding_transaction_signed( - &channel_id, - &counterparty_node_id, - signed_tx, - ) { - Ok(()) => {}, - Err(APIError::APIMisuseError { ref err }) - if err.contains("not expecting funding signatures") => + let wallet_script = nodes[node_idx].wallet.get_change_script().unwrap(); + let has_unknown_spent_input = unsigned_transaction.input.iter().any(|input| { + !chain_state.is_unspent(&input.previous_output) + && chain_state.confirmed_output(&input.previous_output).is_none() + }); + assert!( + !has_unknown_spent_input, + "funding transaction referenced an unmodeled input: {:?}", + unsigned_transaction, + ); + let has_spent_wallet_input = unsigned_transaction.input.iter().any(|input| { + !chain_state.is_unspent(&input.previous_output) + && chain_state + .confirmed_output(&input.previous_output) + .map_or(false, |output| output.script_pubkey == wallet_script) + }); + if has_spent_wallet_input { + // A queued RBF signing request can lose the race against a + // transaction confirming with one of its wallet inputs. + match nodes[node_idx] + .cancel_funding_contributed(&channel_id, &counterparty_node_id) { - // A queued signing event can be invalidated by a later `tx_abort` - // before the application handles it. - }, - Err(e) => panic!("{e:?}"), + Ok(()) => {}, + Err(APIError::APIMisuseError { ref err }) + if err.contains("does not have a pending splice negotiation") => {}, + Err(e) => panic!("{e:?}"), + } + } else { + let signed_tx = + nodes[node_idx].wallet.sign_tx(unsigned_transaction).unwrap(); + match nodes[node_idx].funding_transaction_signed( + &channel_id, + &counterparty_node_id, + signed_tx, + ) { + Ok(()) => {}, + Err(APIError::APIMisuseError { ref err }) + if err.contains("not expecting funding signatures") => + { + // A queued signing event can be invalidated by a later `tx_abort` + // before the application handles it. + }, + Err(e) => panic!("{e:?}"), + } } }, - events::Event::SpliceNegotiated { new_funding_txo, .. } => { - let mut txs = nodes[node_idx].broadcaster.txn_broadcasted.borrow_mut(); - assert!(txs.len() >= 1); - let splice_tx = txs.remove(0); - assert_eq!(new_funding_txo.txid, splice_tx.compute_txid()); - chain_state.add_pending_tx(splice_tx); - }, + events::Event::SpliceNegotiated { .. } => {}, events::Event::SpliceNegotiationFailed { .. } => {}, events::Event::DiscardFunding { funding_info: @@ -2935,6 +3209,20 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.bc_link.reconnect(&self.nodes); } + // Finds the earliest loaded monitor height for a node. Startup sync uses it + // as ChainMonitor's start height so raw monitors loaded below the manager's + // best block still see every block and transaction they missed. + fn oldest_monitor_height_for_node(&self, node_idx: usize) -> u32 { + let node = &self.nodes[node_idx]; + let mut min_monitor_height = node.manager_height(); + for chan_id in node.monitor.list_monitors() { + if let Ok(mon) = node.monitor.get_monitor(chan_id) { + min_monitor_height = cmp::min(min_monitor_height, mon.current_best_block().height); + } + } + min_monitor_height + } + fn restart_node(&mut self, node_idx: usize, v: u8, router: &'a FuzzRouter) { if !self.nodes[node_idx].deferred { self.nodes[node_idx].checkpoint_manager_persistence(); @@ -2954,6 +3242,21 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } let loaded_manager_generation = self.nodes[node_idx].reload(v, &self.out, router, self.chan_type); + let monitor_start_height = self.oldest_monitor_height_for_node(node_idx); + let manager_start_height = self.nodes[node_idx].manager_height(); + // Startup sync is part of LDK's deserialization contract. + self.nodes[node_idx].connect_chain_range( + &self.chain_state, + monitor_start_height, + manager_start_height, + self.chain_state.tip_height(), + ); + assert_eq!( + self.nodes[node_idx].manager_height(), + self.chain_state.tip_height(), + "reloaded node {} must sync to the harness tip before normal operation resumes", + node_idx + ); let rolled_back_payment_hashes = self.payments.nodes[node_idx] .sync_pending_with_manager_generation(loaded_manager_generation); for payment_hash in rolled_back_payment_hashes { @@ -2962,6 +3265,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } fn settle_all(&mut self) { + let chain_state = &self.chain_state; + for node in &mut self.nodes { + node.sync_with_chain_state(chain_state, None); + } + // First, make sure peers are all connected to each other self.reconnect_ab(); self.reconnect_bc(); @@ -3036,6 +3344,102 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } made_progress } + + // Relays one node's broadcasts into the mempool. Per-node relay lets fuzz + // inputs model partial propagation before a block is mined. + fn relay_broadcasts_for_node(&mut self, node_idx: usize) { + let txs = self.nodes[node_idx] + .broadcaster + .txn_broadcasted + .borrow_mut() + .drain(..) + .collect::>(); + self.chain_state.relay_transactions(txs); + } + + fn earliest_pending_htlc_expiry(&self) -> Option { + let mut earliest_expiry: Option = None; + for node in &self.nodes { + for chan in node.list_channels() { + for htlc in &chan.pending_inbound_htlcs { + earliest_expiry = Some( + earliest_expiry + .map_or(htlc.cltv_expiry, |expiry| expiry.min(htlc.cltv_expiry)), + ); + } + for htlc in &chan.pending_outbound_htlcs { + earliest_expiry = Some( + earliest_expiry + .map_or(htlc.cltv_expiry, |expiry| expiry.min(htlc.cltv_expiry)), + ); + } + } + } + earliest_expiry + } + + fn safe_mine_block_count(&self, count: u32) -> u32 { + if let Some(expiry) = self.earliest_pending_htlc_expiry() { + let current_tip = self.chain_state.tip_height(); + // LDK may close to protect a pending HTLC before its raw CLTV + // expiry. Keep mining outside that fail-back window so fuzzed block + // production does not force an on-chain timeout path. + let timeout_deadline = expiry.saturating_sub(channelmonitor::HTLC_FAIL_BACK_BUFFER); + assert!( + current_tip < timeout_deadline, + "pending HTLC with expiry {} and timeout deadline {} is already unsafe at tip {}", + expiry, + timeout_deadline, + current_tip + ); + // Stop before the deadline block itself, since connecting it is + // enough for ChannelMonitor timeout handling to run. + count.min(timeout_deadline - current_tip - 1) + } else { + count + } + } + + // Mines blocks through ChainState, then applies confirmed transactions to + // the wallets and syncs node chain listeners. + fn mine_blocks(&mut self, count: u32) -> u32 { + assert!(count > 0, "mining zero blocks should not be requested"); + + let count = self.safe_mine_block_count(count); + if count == 0 { + return 0; + } + let confirmed_txs = self.chain_state.mine_blocks(count); + let wallets = [ + self.nodes[0].wallet.as_ref(), + self.nodes[1].wallet.as_ref(), + self.nodes[2].wallet.as_ref(), + ]; + for tx in &confirmed_txs { + for wallet in wallets.iter().copied() { + let change_script = wallet.get_change_script().unwrap(); + for input in &tx.input { + // The test wallet is a simple UTXO source. When one of its + // outputs is spent by a confirmed transaction, remove it so + // later funding attempts cannot double-spend it. + wallet.remove_utxo(input.previous_output); + } + for (vout, output) in tx.output.iter().enumerate() { + if output.script_pubkey == change_script { + // Add outputs to whichever test wallet owns the script. + // This lets splice flows recycle wallet change through + // later fuzz commands. + wallet.add_utxo(tx.clone(), vout as u32); + } + } + } + } + let chain_state = &self.chain_state; + for node in &mut self.nodes { + node.sync_with_chain_state(chain_state, None); + } + count + } } #[inline] @@ -3245,32 +3649,14 @@ pub fn do_test(data: &[u8], out: Out) { harness.nodes[2].splice_out(&cp_node_id, &harness.chan_b_id()); }, - // Sync node by 1 block to cover confirmation of a transaction. - 0xa8 => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[0].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - 0xa9 => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[1].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - 0xaa => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[2].sync_with_chain_state(&harness.chain_state, Some(1)); - }, - // Sync node to chain tip to cover confirmation of a transaction post-reorg-risk. - 0xab => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[0].sync_with_chain_state(&harness.chain_state, None); - }, - 0xac => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[1].sync_with_chain_state(&harness.chain_state, None); - }, - 0xad => { - harness.chain_state.confirm_pending_txs(); - harness.nodes[2].sync_with_chain_state(&harness.chain_state, None); - }, + // Sync node by 1 block. + 0xa8 => harness.nodes[0].sync_with_chain_state(&harness.chain_state, Some(1)), + 0xa9 => harness.nodes[1].sync_with_chain_state(&harness.chain_state, Some(1)), + 0xaa => harness.nodes[2].sync_with_chain_state(&harness.chain_state, Some(1)), + // Sync node to chain tip. + 0xab => harness.nodes[0].sync_with_chain_state(&harness.chain_state, None), + 0xac => harness.nodes[1].sync_with_chain_state(&harness.chain_state, None), + 0xad => harness.nodes[2].sync_with_chain_state(&harness.chain_state, None), 0xb0 | 0xb1 | 0xb2 => { // Restart node A, picking among persisted and in-flight `ChannelMonitor` @@ -3395,6 +3781,13 @@ pub fn do_test(data: &[u8], out: Out) { .enable_op_for_all_signers(SignerOp::SignSpliceSharedInput); harness.nodes[2].signer_unblocked(None); }, + 0xd6 => harness.relay_broadcasts_for_node(0), + 0xd7 => harness.relay_broadcasts_for_node(1), + 0xd8 => harness.relay_broadcasts_for_node(2), + 0xd9..=0xe0 => { + let count = MINE_BLOCK_COUNTS[(v - 0xd9) as usize]; + harness.mine_blocks(count); + }, 0xf0 => harness.ab_link.complete_monitor_updates_for_node( 0, diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 3df6f5fc436..463c2e8e8f6 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -12786,6 +12786,17 @@ where )); } + if pending_splice + .negotiated_candidates + .iter() + .any(|funding| funding.funding_tx_confirmation_height != 0) + { + return Err(format!( + "Channel {} has a confirmed pending splice, cannot RBF", + self.context.channel_id(), + )); + } + if pending_splice.negotiated_candidates.is_empty() { return Err(format!( "Channel {} has no negotiated splice candidates to RBF", @@ -13438,6 +13449,17 @@ where ))); } + if pending_splice + .negotiated_candidates + .iter() + .any(|funding| funding.funding_tx_confirmation_height != 0) + { + return Err(ChannelError::WarnAndDisconnect(format!( + "Channel {} has a confirmed pending splice, cannot RBF", + self.context.channel_id(), + ))); + } + let last_candidate = match pending_splice.negotiated_candidates.last() { Some(candidate) => candidate, None => {