From bf6311dedb8bd1e9d40b578f6e8bca09195d8855 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Tue, 17 Jun 2025 08:16:14 +0000 Subject: [PATCH 1/2] Refactor: Trigger block stream immediately on height update in ChainStream.ts The `fromBlock` method in `ChainStream.ts` was modified to react immediately to chain height updates. Previously, it would wait for a grace period before checking for new blocks. The `expand` operator in `fromBlock` now listens to changes in `this.chainInfo`. When the current block height is reached, instead of a fixed delay, it waits for `this.chainInfo` to emit a new height greater than the current block. This commit only includes changes to `src/ChainStream.ts`. --- src/ChainStream.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/ChainStream.ts b/src/ChainStream.ts index 848da44..96cb644 100644 --- a/src/ChainStream.ts +++ b/src/ChainStream.ts @@ -18,11 +18,13 @@ import { catchError, concatMap, expand, + filter, interval, of, range, retry, switchMap, + take, tap, timer } from "rxjs"; @@ -114,10 +116,12 @@ export class ChainStream { } if (currentBlock >= this.chainInfo.value.height) { - return of([]).pipe( - tap(() => this.logger.log(`No new blocks, retrying after ${config.gracePeriodMs} ms...`)), - delay(config.gracePeriodMs), - switchMap(() => of([])) + // Listen for chainInfo updates and emit only when new height is greater than currentBlock + return this.chainInfo.pipe( + tap(newInfo => this.logger.log(`No new blocks, current height ${this.chainInfo.value.height}, waiting for new blocks above ${currentBlock}. New info height: ${newInfo.height}`)), + filter(newInfo => newInfo.height > currentBlock), + take(1), // Take the first emission that satisfies the condition + switchMap(() => of([])) // Continue with an empty emission to trigger the next expand cycle ); } From 0e81dd4a7cb977938dd39f15befb7cd7796d2a79 Mon Sep 17 00:00:00 2001 From: dzikowski-ai Date: Tue, 17 Jun 2025 11:39:25 +0200 Subject: [PATCH 2/2] Update src/ChainStream.ts Co-authored-by: Jakub Dzikowski --- src/ChainStream.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/ChainStream.ts b/src/ChainStream.ts index 96cb644..7226337 100644 --- a/src/ChainStream.ts +++ b/src/ChainStream.ts @@ -116,12 +116,10 @@ export class ChainStream { } if (currentBlock >= this.chainInfo.value.height) { - // Listen for chainInfo updates and emit only when new height is greater than currentBlock return this.chainInfo.pipe( - tap(newInfo => this.logger.log(`No new blocks, current height ${this.chainInfo.value.height}, waiting for new blocks above ${currentBlock}. New info height: ${newInfo.height}`)), - filter(newInfo => newInfo.height > currentBlock), - take(1), // Take the first emission that satisfies the condition - switchMap(() => of([])) // Continue with an empty emission to trigger the next expand cycle + filter((newInfo) => newInfo.height > currentBlock), + take(1), + switchMap(() => of([])) ); }