Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ const config = {
grpcHostnameOverride: "peer0.curator.local"
},
stream: {
chainInfoPollingIntervalMs: 2000,
intervalMs: 1000,
gracePeriodMs: 1000,
batchSize: 10,
retryOnErrorDelayMs: 5000,
maxRetryCount: 5
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@gala-chain/stream",
"version": "0.0.3",
"version": "0.1.0",
"description": "Streams blocks from GalaChain or Hyperledger Fabric network as RxJS Observables",
"author": "",
"private": false,
Expand Down
15 changes: 9 additions & 6 deletions src/ChainStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
tap,
timer
} from "rxjs";
import { bufferCount } from "rxjs/operators";
import { bufferCount, delay } from "rxjs/operators";

import { CAService, IIdentity } from "./CAService";
import { ChainService, TransactionFilter } from "./ChainService";
Expand Down Expand Up @@ -69,11 +69,11 @@ export class ChainStream {
}

public startPollingChainHeight(config: {
intervalMs: number;
gracePeriodMs: number;
retryOnErrorDelayMs: number;
maxRetryCount: number;
}) {
return interval(config.intervalMs)
return interval(config.gracePeriodMs)
.pipe(
switchMap(async () => {
const info = await this.queryChainInfo();
Expand Down Expand Up @@ -102,7 +102,7 @@ export class ChainStream {

public fromBlock(
startBlock: number,
config: { batchSize: number; intervalMs: number; retryOnErrorDelayMs: number; maxRetryCount: number },
config: { batchSize: number; retryOnErrorDelayMs: number; maxRetryCount: number; gracePeriodMs: number },
transactionFilter: TransactionFilter = () => true
) {
let currentBlock = startBlock; // Keep track of the current block we're fetching
Expand All @@ -114,8 +114,9 @@ export class ChainStream {
}

if (currentBlock >= this.chainInfo.value.height) {
return timer(config.intervalMs).pipe(
tap(() => this.logger.log(`No new blocks, retrying after ${config.intervalMs} ms...`)),
return of([]).pipe(
tap(() => this.logger.log(`No new blocks, retrying after ${config.gracePeriodMs} ms...`)),
delay(config.gracePeriodMs),
switchMap(() => of([]))
);
}
Expand All @@ -138,6 +139,8 @@ export class ChainStream {
return blocks;
}),

delay(config.gracePeriodMs),

catchError((err) => {
const channel = this.chainInfo.value.channelName;
this.logger.warn(`Error fetching blocks from channel ${channel}: ${err.message}`);
Expand Down
13 changes: 6 additions & 7 deletions src/ConnectedStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import { ConnectedTransactionStream } from "./ConnectedTransactionStream";
import { Block } from "./types";

export interface StreamConfig {
chainInfoPollingIntervalMs: number;
intervalMs: number;
batchSize: number;
gracePeriodMs: number;
retryOnErrorDelayMs: number;
batchSize: number;
maxRetryCount: number;
}

Expand All @@ -36,15 +35,15 @@ export class ConnectedStream {
private readonly streamConfig: StreamConfig
) {
this.chainHeightSubscription = chainStream.startPollingChainHeight({
intervalMs: streamConfig.chainInfoPollingIntervalMs,
retryOnErrorDelayMs: streamConfig.retryOnErrorDelayMs,
maxRetryCount: streamConfig.maxRetryCount
gracePeriodMs: this.streamConfig.gracePeriodMs,
retryOnErrorDelayMs: this.streamConfig.retryOnErrorDelayMs,
maxRetryCount: this.streamConfig.maxRetryCount
});
}

public fromBlock(number: number): Observable<Block> {
return this.chainStream.fromBlock(number, {
intervalMs: this.streamConfig.intervalMs,
gracePeriodMs: this.streamConfig.gracePeriodMs,
batchSize: this.streamConfig.batchSize,
retryOnErrorDelayMs: this.streamConfig.retryOnErrorDelayMs,
maxRetryCount: this.streamConfig.maxRetryCount
Expand Down
7 changes: 2 additions & 5 deletions src/StreamBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ const defaultPeerConfig = {
};

const defaultStreamConfig = {
chainInfoPollingIntervalMs: 2000,
intervalMs: 500,
gracePeriodMs: 1000,
batchSize: 10,
retryOnErrorDelayMs: 5000,
maxRetryCount: 5
Expand Down Expand Up @@ -89,9 +88,7 @@ export class StreamBuilder {
grpcHostnameOverride: params.peer?.grpcHostnameOverride ?? defaultPeerConfig.grpcHostnameOverride
};
this.streamConfig = {
chainInfoPollingIntervalMs:
params.stream?.chainInfoPollingIntervalMs ?? defaultStreamConfig.chainInfoPollingIntervalMs,
intervalMs: params.stream?.intervalMs ?? defaultStreamConfig.intervalMs,
gracePeriodMs: params.stream?.gracePeriodMs ?? defaultStreamConfig.gracePeriodMs,
batchSize: params.stream?.batchSize ?? defaultStreamConfig.batchSize,
retryOnErrorDelayMs: params.stream?.retryOnErrorDelayMs ?? defaultStreamConfig.retryOnErrorDelayMs,
maxRetryCount: params.stream?.maxRetryCount ?? defaultStreamConfig.maxRetryCount
Expand Down
3 changes: 1 addition & 2 deletions src/sample-blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const config = {
grpcHostnameOverride: "peer0.curator.local"
},
stream: {
chainInfoPollingIntervalMs: 2000,
intervalMs: 1000,
gracePeriodMs: 1000,
batchSize: 10,
retryOnErrorDelayMs: 5000,
maxRetryCount: 5
Expand Down
3 changes: 1 addition & 2 deletions src/sample-transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ const config = {
grpcHostnameOverride: "peer0.curator.local"
},
stream: {
chainInfoPollingIntervalMs: 2000,
intervalMs: 1000,
gracePeriodMs: 1000,
batchSize: 10,
retryOnErrorDelayMs: 5000,
maxRetryCount: 5
Expand Down
5 changes: 2 additions & 3 deletions src/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ beforeAll(() => {
.connect({
// lower values for faster tests
stream: {
chainInfoPollingIntervalMs: 100,
intervalMs: 100,
gracePeriodMs: 100,
batchSize: 3,
retryOnErrorDelayMs: 500,
retryOnErrorDelayMs: 250,
maxRetryCount: 10
},
logger
Expand Down
Loading