Skip to content

Commit c6da16e

Browse files
joostjagerclaude
andcommitted
Add deferred monitor write support to chanmon_consistency fuzz target
Add a `deferred` flag to `TestChainMonitor` that controls whether the underlying `ChainMonitor` queues operations instead of executing them immediately. The flag is derived from the fuzz input alongside the existing monitor style bits, so each of the three nodes can independently run in deferred or immediate mode. In deferred mode, `watch_channel` and `update_channel` always return `InProgress`. A new `flush_and_update_latest_monitors` method drains the queued operations and, when the persister reports `Completed`, promotes the pending shadow monitor snapshots to persisted state. This method is called before `release_pending_monitor_events` and at each point where the fuzzer completes pending monitor updates. On node reload, deferred monitors are flushed immediately after `watch_channel` so the node starts with a consistent state. AI tools were used in preparing this commit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 423c1dc commit c6da16e

1 file changed

Lines changed: 62 additions & 3 deletions

File tree

fuzz/src/chanmon_consistency.rs

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,13 @@ struct TestChainMonitor {
267267
Arc<KeyProvider>,
268268
>,
269269
>,
270+
pub deferred: bool,
270271
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
271272
}
272273
impl TestChainMonitor {
273274
pub fn new(
274275
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
275-
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
276+
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
276277
) -> Self {
277278
Self {
278279
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
@@ -283,14 +284,44 @@ impl TestChainMonitor {
283284
Arc::clone(&persister),
284285
Arc::clone(&keys),
285286
keys.get_peer_storage_key(),
286-
false,
287+
deferred,
287288
)),
288289
logger,
289290
keys,
290291
persister,
292+
deferred,
291293
latest_monitors: Mutex::new(new_hash_map()),
292294
}
293295
}
296+
297+
/// Flushes all deferred monitor operations and, if the persister reports success, promotes
298+
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
299+
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
300+
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
301+
/// snapshots rather than relying on the persister's storage.
302+
///
303+
/// This simulates the pattern of snapshotting the pending count, persisting the
304+
/// `ChannelManager`, then flushing the queued monitor writes.
305+
fn flush_and_update_latest_monitors(&self) {
306+
let count = self.chain_monitor.pending_operation_count();
307+
if count == 0 {
308+
return;
309+
}
310+
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
311+
self.chain_monitor.flush(count, &self.logger);
312+
let persister_res = *self.persister.update_ret.lock().unwrap();
313+
// Only update our local tracking state when the persister signals completion. When
314+
// persistence is still in-progress, the monitors stay in the pending set so that a
315+
// simulated restart can still reload from the last fully-persisted snapshot.
316+
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
317+
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
318+
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
319+
state.persisted_monitor_id = id;
320+
state.persisted_monitor = data;
321+
}
322+
}
323+
}
324+
}
294325
}
295326
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
296327
fn watch_channel(
@@ -300,6 +331,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
300331
monitor.write(&mut ser).unwrap();
301332
let monitor_id = monitor.get_latest_update_id();
302333
let res = self.chain_monitor.watch_channel(channel_id, monitor);
334+
if self.deferred {
335+
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
336+
}
303337
let state = match res {
304338
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
305339
persisted_monitor_id: monitor_id,
@@ -349,6 +383,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
349383
let mut ser = VecWriter(Vec::new());
350384
deserialized_monitor.write(&mut ser).unwrap();
351385
let res = self.chain_monitor.update_channel(channel_id, update);
386+
if self.deferred {
387+
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
388+
}
352389
match res {
353390
chain::ChannelMonitorUpdateStatus::Completed => {
354391
map_entry.persisted_monitor_id = update.update_id;
@@ -365,6 +402,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
365402
fn release_pending_monitor_events(
366403
&self,
367404
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
405+
if self.deferred {
406+
self.flush_and_update_latest_monitors();
407+
}
368408
return self.chain_monitor.release_pending_monitor_events();
369409
}
370410
}
@@ -902,6 +942,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
902942
ChannelMonitorUpdateStatus::Completed
903943
}),
904944
];
945+
let deferred = [
946+
initial_mon_styles & 0b001_000 != 0,
947+
initial_mon_styles & 0b010_000 != 0,
948+
initial_mon_styles & 0b100_000 != 0,
949+
];
905950

906951
let mut chain_state = ChainState::new();
907952
let mut node_height_a: u32 = 0;
@@ -930,6 +975,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
930975
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
931976
}),
932977
Arc::clone(&keys_manager),
978+
deferred[$node_id as usize],
933979
));
934980

935981
let mut config = UserConfig::default();
@@ -993,6 +1039,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
9931039
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
9941040
}),
9951041
Arc::clone(keys),
1042+
deferred[node_id as usize],
9961043
));
9971044

9981045
let mut config = UserConfig::default();
@@ -1070,18 +1117,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
10701117
let manager =
10711118
<(BestBlock, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
10721119
let res = (manager.1, chain_monitor.clone());
1120+
let expected_status = if deferred[node_id as usize] {
1121+
ChannelMonitorUpdateStatus::InProgress
1122+
} else {
1123+
ChannelMonitorUpdateStatus::Completed
1124+
};
10731125
for (channel_id, mon) in monitors.drain() {
10741126
assert_eq!(
10751127
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
1076-
Ok(ChannelMonitorUpdateStatus::Completed)
1128+
Ok(expected_status)
10771129
);
10781130
}
1131+
if deferred[node_id as usize] {
1132+
let count = chain_monitor.chain_monitor.pending_operation_count();
1133+
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
1134+
}
10791135
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
10801136
res
10811137
};
10821138

10831139
macro_rules! complete_all_pending_monitor_updates {
10841140
($monitor: expr) => {{
1141+
$monitor.flush_and_update_latest_monitors();
10851142
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
10861143
for (id, data) in state.pending_monitors.drain(..) {
10871144
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
@@ -2060,6 +2117,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
20602117
|monitor: &Arc<TestChainMonitor>,
20612118
chan_funding,
20622119
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
2120+
monitor.flush_and_update_latest_monitors();
20632121
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
20642122
assert!(
20652123
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
@@ -2076,6 +2134,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(data: &[u8], out: Out) {
20762134
};
20772135

20782136
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
2137+
monitor.flush_and_update_latest_monitors();
20792138
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
20802139
assert!(
20812140
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),

0 commit comments

Comments
 (0)