Skip to content

Commit ba04a6e

Browse files
committed
allow breaking inside for_blocks closures using ControlFlow
1 parent 317de62 commit ba04a6e

5 files changed

Lines changed: 43 additions & 18 deletions

File tree

src/daemon.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde_json::{json, Value};
88

99
use std::fs::File;
1010
use std::io::Read;
11+
use std::ops::ControlFlow;
1112
use std::path::Path;
1213

1314
use crate::{
@@ -228,10 +229,10 @@ impl Daemon {
228229
self.p2p.lock().get_new_headers(chain)
229230
}
230231

231-
pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
232+
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<ControlFlow<R>>
232233
where
233234
B: IntoIterator<Item = BlockHash>,
234-
F: FnMut(BlockHash, Block),
235+
F: FnMut(BlockHash, Block) -> ControlFlow<R>,
235236
{
236237
self.p2p.lock().for_blocks(blockhashes, func)
237238
}

src/index.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::{Context, Result};
22
use bitcoin::consensus::{deserialize, serialize};
33
use bitcoin::{Block, BlockHash, OutPoint, Txid};
4+
use std::ops::ControlFlow;
45

56
use crate::{
67
chain::{Chain, NewHeader},
@@ -205,6 +206,7 @@ impl Index {
205206
index_single_block(blockhash, block, height, &mut batch);
206207
});
207208
self.stats.height.set("tip", height as f64);
209+
ControlFlow::Continue::<()>(())
208210
})?;
209211
let heights: Vec<_> = heights.collect();
210212
assert!(

src/p2p.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender};
2121

2222
use std::io::{self, ErrorKind, Write};
2323
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
24+
use std::ops::ControlFlow;
2425
use std::sync::Arc;
2526
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2627

@@ -92,21 +93,26 @@ impl Connection {
9293
/// Request and process the specified blocks (in the specified order).
9394
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
9495
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
95-
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
96+
pub(crate) fn for_blocks<B, F, R>(
97+
&mut self,
98+
blockhashes: B,
99+
mut func: F,
100+
) -> Result<ControlFlow<R>>
96101
where
97102
B: IntoIterator<Item = BlockHash>,
98-
F: FnMut(BlockHash, Block),
103+
F: FnMut(BlockHash, Block) -> ControlFlow<R>,
99104
{
100105
self.blocks_duration.observe_duration("total", || {
101106
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
102107
if blockhashes.is_empty() {
103-
return Ok(());
108+
return Ok(ControlFlow::Continue(()));
104109
}
105110
self.blocks_duration.observe_duration("request", || {
106111
debug!("loading {} blocks", blockhashes.len());
107112
self.req_send.send(Request::get_blocks(&blockhashes))
108113
})?;
109114

115+
let mut ret = ControlFlow::Continue(());
110116
for hash in blockhashes {
111117
let block = self.blocks_duration.observe_duration("response", || {
112118
let block = self
@@ -116,10 +122,13 @@ impl Connection {
116122
ensure!(block.block_hash() == hash, "got unexpected block");
117123
Ok(block)
118124
})?;
119-
self.blocks_duration
120-
.observe_duration("process", || func(hash, block));
125+
if ret.is_continue() {
126+
ret = self
127+
.blocks_duration
128+
.observe_duration("process", || func(hash, block));
129+
}
121130
}
122-
Ok(())
131+
Ok(ret)
123132
})
124133
}
125134

src/status.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde::ser::{Serialize, Serializer};
88

99
use std::collections::{BTreeMap, HashMap, HashSet};
1010
use std::convert::TryFrom;
11+
use std::ops::ControlFlow;
1112

1213
use crate::{
1314
cache::Cache,
@@ -303,10 +304,15 @@ impl ScriptHashStatus {
303304
}
304305

305306
/// Apply func only on the new blocks (fetched from daemon).
306-
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
307+
fn for_new_blocks<B, F, R>(
308+
&self,
309+
blockhashes: B,
310+
daemon: &Daemon,
311+
func: F,
312+
) -> Result<ControlFlow<R>>
307313
where
308314
B: IntoIterator<Item = BlockHash>,
309-
F: FnMut(BlockHash, Block),
315+
F: FnMut(BlockHash, Block) -> ControlFlow<R>,
310316
{
311317
daemon.for_blocks(
312318
blockhashes
@@ -346,6 +352,7 @@ impl ScriptHashStatus {
346352
.outputs = funding_outputs;
347353
},
348354
);
355+
ControlFlow::Continue::<()>(())
349356
})?;
350357
let spending_blockhashes: HashSet<BlockHash> = outpoints
351358
.par_iter()
@@ -367,6 +374,7 @@ impl ScriptHashStatus {
367374
.spent = spent_outpoints;
368375
},
369376
);
377+
ControlFlow::Continue::<()>(())
370378
})?;
371379

372380
Ok(result

src/tracker.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::{Context, Result};
22
use bitcoin::{BlockHash, Transaction, Txid};
3+
use std::ops::ControlFlow;
34

45
use crate::{
56
cache::Cache,
@@ -100,18 +101,22 @@ impl Tracker {
100101
) -> Result<Option<(BlockHash, Transaction)>> {
101102
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
102103
let blockhashes = self.index.filter_by_txid(txid);
103-
let mut result = None;
104-
daemon.for_blocks(blockhashes, |blockhash, block| {
104+
let result = daemon.for_blocks(blockhashes, |blockhash, block| {
105105
for tx in block.txdata {
106-
if result.is_some() {
107-
return;
108-
}
109106
if tx.txid() == txid {
110-
result = Some((blockhash, tx));
111-
return;
107+
return ControlFlow::Break((blockhash, tx));
112108
}
113109
}
110+
ControlFlow::Continue(())
114111
})?;
115-
Ok(result)
112+
Ok(control_flow_break_value(result))
113+
}
114+
}
115+
116+
/// See unstable ControlFlow::break_value (https://github.com/rust-lang/rust/issues/75744)
117+
fn control_flow_break_value<B, C>(value: ControlFlow<B, C>) -> Option<B> {
118+
match value {
119+
ControlFlow::Continue(..) => None,
120+
ControlFlow::Break(x) => Some(x),
116121
}
117122
}

0 commit comments

Comments
 (0)