diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 24d0755d040..8ee2e02ae00 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -183,6 +183,7 @@ pin_project! { events: bool, tipset_keys:bool, track_progress: bool, + n_polled: usize, } } @@ -254,6 +255,7 @@ pub fn stream_chain< events: false, tipset_keys: false, track_progress: false, + n_polled: 0, } } @@ -278,7 +280,7 @@ impl, ITER: Iterator + Unpin, S: Cid { type Item = anyhow::Result; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use Task::*; let export_tipset_keys = self.tipset_keys; @@ -286,6 +288,15 @@ impl, ITER: Iterator + Unpin, S: Cid let stateroot_limit_exclusive = self.stateroot_limit_exclusive; let this = self.project(); + // Yield to the runtime every 128 polls to allow cancellation + { + *this.n_polled += 1; + if this.n_polled.is_multiple_of(128) { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + } + loop { while let Some(task) = this.dfs.front_mut() { match task {