diff --git a/README.md b/README.md index a71dc77..bba59b2 100644 --- a/README.md +++ b/README.md @@ -76,8 +76,7 @@ const config = { grpcHostnameOverride: "peer0.curator.local" }, stream: { - chainInfoPollingIntervalMs: 2000, - intervalMs: 1000, + gracePeriodMs: 1000, batchSize: 10, retryOnErrorDelayMs: 5000, maxRetryCount: 5 diff --git a/package-lock.json b/package-lock.json index 29ee2ca..0a7001a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@gala-chain/stream", - "version": "0.0.3", + "version": "0.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@gala-chain/stream", - "version": "0.0.3", + "version": "0.1.0", "license": "Apache-2.0", "dependencies": { "@hyperledger/fabric-gateway": "^1.6.0", diff --git a/package.json b/package.json index ecb6bd4..b4dcc64 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@gala-chain/stream", - "version": "0.0.3", + "version": "0.1.0", "description": "Streams blocks from GalaChain or Hyperledger Fabric network as RxJS Observables", "author": "", "private": false, diff --git a/src/ChainStream.ts b/src/ChainStream.ts index 218e200..848da44 100644 --- a/src/ChainStream.ts +++ b/src/ChainStream.ts @@ -26,7 +26,7 @@ import { tap, timer } from "rxjs"; -import { bufferCount } from "rxjs/operators"; +import { bufferCount, delay } from "rxjs/operators"; import { CAService, IIdentity } from "./CAService"; import { ChainService, TransactionFilter } from "./ChainService"; @@ -69,11 +69,11 @@ export class ChainStream { } public startPollingChainHeight(config: { - intervalMs: number; + gracePeriodMs: number; retryOnErrorDelayMs: number; maxRetryCount: number; }) { - return interval(config.intervalMs) + return interval(config.gracePeriodMs) .pipe( switchMap(async () => { const info = await this.queryChainInfo(); @@ -102,7 +102,7 @@ export class ChainStream { public fromBlock( startBlock: number, - config: { batchSize: number; intervalMs: number; retryOnErrorDelayMs: number; maxRetryCount: number }, + config: { batchSize: number; retryOnErrorDelayMs: number; maxRetryCount: number; gracePeriodMs: number }, transactionFilter: TransactionFilter = () => true ) { let currentBlock = startBlock; // Keep track of the current block we're fetching @@ -114,8 +114,9 @@ export class ChainStream { } if (currentBlock >= this.chainInfo.value.height) { - return timer(config.intervalMs).pipe( - tap(() => this.logger.log(`No new blocks, retrying after ${config.intervalMs} ms...`)), + return of([]).pipe( + tap(() => this.logger.log(`No new blocks, retrying after ${config.gracePeriodMs} ms...`)), + delay(config.gracePeriodMs), switchMap(() => of([])) ); } @@ -138,6 +139,8 @@ export class ChainStream { return blocks; }), + delay(config.gracePeriodMs), + catchError((err) => { const channel = this.chainInfo.value.channelName; this.logger.warn(`Error fetching blocks from channel ${channel}: ${err.message}`); diff --git a/src/ConnectedStream.ts b/src/ConnectedStream.ts index bde26f5..b9c76e7 100644 --- a/src/ConnectedStream.ts +++ b/src/ConnectedStream.ts @@ -21,10 +21,9 @@ import { ConnectedTransactionStream } from "./ConnectedTransactionStream"; import { Block } from "./types"; export interface StreamConfig { - chainInfoPollingIntervalMs: number; - intervalMs: number; - batchSize: number; + gracePeriodMs: number; retryOnErrorDelayMs: number; + batchSize: number; maxRetryCount: number; } @@ -36,15 +35,15 @@ export class ConnectedStream { private readonly streamConfig: StreamConfig ) { this.chainHeightSubscription = chainStream.startPollingChainHeight({ - intervalMs: streamConfig.chainInfoPollingIntervalMs, - retryOnErrorDelayMs: streamConfig.retryOnErrorDelayMs, - maxRetryCount: streamConfig.maxRetryCount + gracePeriodMs: this.streamConfig.gracePeriodMs, + retryOnErrorDelayMs: this.streamConfig.retryOnErrorDelayMs, + maxRetryCount: this.streamConfig.maxRetryCount }); } public fromBlock(number: number): Observable { return this.chainStream.fromBlock(number, { - intervalMs: this.streamConfig.intervalMs, + gracePeriodMs: this.streamConfig.gracePeriodMs, batchSize: this.streamConfig.batchSize, retryOnErrorDelayMs: this.streamConfig.retryOnErrorDelayMs, maxRetryCount: this.streamConfig.maxRetryCount diff --git a/src/StreamBuilder.ts b/src/StreamBuilder.ts index 793ea0d..ca2fbad 100644 --- a/src/StreamBuilder.ts +++ b/src/StreamBuilder.ts @@ -37,8 +37,7 @@ const defaultPeerConfig = { }; const defaultStreamConfig = { - chainInfoPollingIntervalMs: 2000, - intervalMs: 500, + gracePeriodMs: 1000, batchSize: 10, retryOnErrorDelayMs: 5000, maxRetryCount: 5 @@ -89,9 +88,7 @@ export class StreamBuilder { grpcHostnameOverride: params.peer?.grpcHostnameOverride ?? defaultPeerConfig.grpcHostnameOverride }; this.streamConfig = { - chainInfoPollingIntervalMs: - params.stream?.chainInfoPollingIntervalMs ?? defaultStreamConfig.chainInfoPollingIntervalMs, - intervalMs: params.stream?.intervalMs ?? defaultStreamConfig.intervalMs, + gracePeriodMs: params.stream?.gracePeriodMs ?? defaultStreamConfig.gracePeriodMs, batchSize: params.stream?.batchSize ?? defaultStreamConfig.batchSize, retryOnErrorDelayMs: params.stream?.retryOnErrorDelayMs ?? defaultStreamConfig.retryOnErrorDelayMs, maxRetryCount: params.stream?.maxRetryCount ?? defaultStreamConfig.maxRetryCount diff --git a/src/sample-blocks.ts b/src/sample-blocks.ts index ea0c632..42136c6 100644 --- a/src/sample-blocks.ts +++ b/src/sample-blocks.ts @@ -32,8 +32,7 @@ const config = { grpcHostnameOverride: "peer0.curator.local" }, stream: { - chainInfoPollingIntervalMs: 2000, - intervalMs: 1000, + gracePeriodMs: 1000, batchSize: 10, retryOnErrorDelayMs: 5000, maxRetryCount: 5 diff --git a/src/sample-transactions.ts b/src/sample-transactions.ts index afbd65f..0fdbb22 100644 --- a/src/sample-transactions.ts +++ b/src/sample-transactions.ts @@ -31,8 +31,7 @@ const config = { grpcHostnameOverride: "peer0.curator.local" }, stream: { - chainInfoPollingIntervalMs: 2000, - intervalMs: 1000, + gracePeriodMs: 1000, batchSize: 10, retryOnErrorDelayMs: 5000, maxRetryCount: 5 diff --git a/src/stream.spec.ts b/src/stream.spec.ts index 8f068b2..83cf887 100644 --- a/src/stream.spec.ts +++ b/src/stream.spec.ts @@ -45,10 +45,9 @@ beforeAll(() => { .connect({ // lower values for faster tests stream: { - chainInfoPollingIntervalMs: 100, - intervalMs: 100, + gracePeriodMs: 100, batchSize: 3, - retryOnErrorDelayMs: 500, + retryOnErrorDelayMs: 250, maxRetryCount: 10 }, logger