From 139c130c840d48b1638e470e588ffe451f52e007 Mon Sep 17 00:00:00 2001 From: 0xghost42 Date: Thu, 14 May 2026 16:34:01 +0530 Subject: [PATCH 1/3] feat(price-pusher): allow multiple injective gRPC endpoints with failover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1014. The injective price pusher accepted a single `--grpc-endpoint`. Cosmos nodes are occasionally flaky, so operators ended up running one pusher process per endpoint just to get redundancy. Maintainer (jayantk) said on the issue that cycling chain endpoints was the right shape. - `--grpc-endpoint` is now `type: "array"`, accepting either repeated flag occurrences or a comma-separated list (the handler splits + trims each entry). A single endpoint still works exactly as before. Empty input is rejected up front with a clear message. - New `src/injective/endpoint-pool.ts`: - `EndpointPool` — round-robin cursor over a non-empty endpoint list, shared across every gRPC API helper in the pusher so a bad endpoint affects the whole instance, not just one method. - `withEndpointFailover(pool, fn)` — runs `fn(currentEndpoint)`, rotates on failure, walks every endpoint at most once before re-throwing the last error. - `InjectivePriceListener` + `InjectivePricePusher` constructors take `string | readonly string[]` for backward compatibility, wrap each gRPC call site (`ChainGrpcWasmApi.fetchSmartContractState`, `ChainGrpcAuthApi.fetchAccount`, `TxGrpcApi.broadcast`, `TxGrpcApi.simulate`) in `withEndpointFailover`. - README: short note on the new fallback-set syntax under the Injective example. - `pnpm --filter @pythnetwork/price-pusher exec test-unit` — 8 cases, all pass: empty rejection, default cursor, round-robin wrap, single-endpoint no-op, success no-rotate, single-failure rotate, exhaust-and-throw, cursor-position respected by `withEndpointFailover`. - `pnpm --filter @pythnetwork/price-pusher exec tsc --noEmit` clean. - Wired `test:unit` into the package scripts so the workspace test runner picks the new file up. The original issue also mentioned the Pyth price service (Hermes) gRPC endpoint as a possibly useful failover target. That's a different code path (HermesClient takes a single URL today), so I kept this PR focused on the chain-endpoint case the reporter named primarily. Happy to follow up on the Hermes side in a separate PR if the maintainers want it. --- apps/price_pusher/README.md | 5 + apps/price_pusher/package.json | 3 +- apps/price_pusher/src/injective/command.ts | 28 +++++- .../src/injective/endpoint-pool.ts | 67 +++++++++++++ apps/price_pusher/src/injective/injective.ts | 87 ++++++++++------ apps/price_pusher/tests/endpoint-pool.test.ts | 99 +++++++++++++++++++ 6 files changed, 251 insertions(+), 38 deletions(-) create mode 100644 apps/price_pusher/src/injective/endpoint-pool.ts create mode 100644 apps/price_pusher/tests/endpoint-pool.test.ts diff --git a/apps/price_pusher/README.md b/apps/price_pusher/README.md index 4df9dadaf8..96c8fadc65 100644 --- a/apps/price_pusher/README.md +++ b/apps/price_pusher/README.md @@ -118,6 +118,11 @@ pnpm run start injective --grpc-endpoint https://grpc-endpoint.com \ [--pushing-frequency 10] \ [--polling-frequency 5] +# `--grpc-endpoint` accepts a fallback set. Pass the flag multiple times or +# supply a comma-separated list (`--grpc-endpoint a,b`). The pusher cycles +# round-robin through endpoints when a gRPC call fails — useful for cosmos +# nodes whose availability can be flaky. + # For Aptos pnpm run start aptos --endpoint https://fullnode.testnet.aptoslabs.com/v1 \ --pyth-contract-address 0x7e783b349d3e89cf5931af376ebeadbfab855b3fa239b7ada8f5a92fbea6b387 \ diff --git a/apps/price_pusher/package.json b/apps/price_pusher/package.json index 79abf56c0e..4458e7491b 100644 --- a/apps/price_pusher/package.json +++ b/apps/price_pusher/package.json @@ -215,7 +215,8 @@ "dev": "ts-node src/index.ts", "prepublishOnly": "pnpm run build", "start": "node dist/index.cjs", - "test:types": "tsc" + "test:types": "tsc", + "test:unit": "test-unit" }, "type": "module", "types": "./dist/index.d.ts", diff --git a/apps/price_pusher/src/injective/command.ts b/apps/price_pusher/src/injective/command.ts index d9b5a03b9c..bf41f6e05f 100644 --- a/apps/price_pusher/src/injective/command.ts +++ b/apps/price_pusher/src/injective/command.ts @@ -25,11 +25,15 @@ export default { } as Options, "grpc-endpoint": { description: - "gRPC endpoint URL for injective. The pusher will periodically" + + "gRPC endpoint URL(s) for injective. The pusher will periodically " + "poll for updates. The polling interval is configurable via the " + - "`polling-frequency` command-line argument.", + "`polling-frequency` command-line argument. " + + "Pass the flag multiple times or supply a comma-separated list " + + "(e.g. `--grpc-endpoint a,b`) to register a fallback set; the " + + "pusher will round-robin through them on gRPC errors.", + type: "array", + string: true, required: true, - type: "string", } as Options, network: { description: "testnet or mainnet", @@ -85,6 +89,20 @@ export default { }); const mnemonic = fs.readFileSync(mnemonicFile, "utf8").trim(); + // `grpc-endpoint` is `type: "array"` so yargs always hands us a list. Each + // entry may itself be a comma-separated string (e.g. `--grpc-endpoint a,b`), + // so split + trim + drop empties to get a clean failover set. + const grpcEndpoints: string[] = (grpcEndpoint as string[]) + .flatMap((entry) => entry.split(",")) + .map((entry) => entry.trim()) + .filter((entry) => entry.length > 0); + if (grpcEndpoints.length === 0) { + throw new Error( + "At least one --grpc-endpoint must be provided. Pass the flag once per " + + "endpoint, or supply a comma-separated list.", + ); + } + let priceItems = priceConfigs.map(({ id, alias }) => ({ alias, id })); // Better to filter out invalid price items before creating the pyth listener @@ -109,7 +127,7 @@ export default { const injectiveListener = new InjectivePriceListener( pythContractAddress, - grpcEndpoint, + grpcEndpoints, priceItems, logger.child({ module: "InjectivePriceListener" }), { @@ -119,7 +137,7 @@ export default { const injectivePusher = new InjectivePricePusher( hermesClient, pythContractAddress, - grpcEndpoint, + grpcEndpoints, logger.child({ module: "InjectivePricePusher" }), mnemonic, { diff --git a/apps/price_pusher/src/injective/endpoint-pool.ts b/apps/price_pusher/src/injective/endpoint-pool.ts new file mode 100644 index 0000000000..50f4d69d1f --- /dev/null +++ b/apps/price_pusher/src/injective/endpoint-pool.ts @@ -0,0 +1,67 @@ +import type { Logger } from "pino"; + +/** + * Round-robin cursor over a non-empty list of gRPC endpoints. Each call site + * rents the current endpoint, and the cursor advances when an attempt fails + * (see `withEndpointFailover`). The cursor is shared across every gRPC API + * helper inside a single pusher instance so that a bad endpoint affects the + * whole pusher, not just one method. + */ +export class EndpointPool { + private currentIndex = 0; + + constructor( + private readonly endpoints: readonly string[], + private readonly logger: Logger, + ) { + if (endpoints.length === 0) { + throw new Error("EndpointPool requires at least one endpoint"); + } + } + + current(): string { + // Non-null asserted: the constructor guarantees length >= 1 and `rotate` + // always wraps the index modulo `endpoints.length`. + + return this.endpoints[this.currentIndex]!; + } + + rotate(reason: unknown): string { + if (this.endpoints.length === 1) { + return this.current(); + } + const failed = this.current(); + this.currentIndex = (this.currentIndex + 1) % this.endpoints.length; + this.logger.warn( + { failedEndpoint: failed, nextEndpoint: this.current(), err: reason }, + "gRPC endpoint failed — rotating to next endpoint", + ); + return this.current(); + } + + size(): number { + return this.endpoints.length; + } +} + +/** + * Try `fn` against the current endpoint; on failure, rotate to the next + * endpoint and retry. Walks through every endpoint at most once before + * re-throwing the last error. + */ +export async function withEndpointFailover( + pool: EndpointPool, + fn: (endpoint: string) => Promise, +): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < pool.size(); attempt++) { + const endpoint = pool.current(); + try { + return await fn(endpoint); + } catch (error) { + lastError = error; + pool.rotate(error); + } + } + throw lastError; +} diff --git a/apps/price_pusher/src/injective/injective.ts b/apps/price_pusher/src/injective/injective.ts index 17fd144522..6511c47f14 100644 --- a/apps/price_pusher/src/injective/injective.ts +++ b/apps/price_pusher/src/injective/injective.ts @@ -20,6 +20,7 @@ import type { Logger } from "pino"; import type { IPricePusher, PriceInfo, PriceItem } from "../interface.js"; import { ChainPriceListener } from "../interface.js"; import type { DurationInSeconds } from "../utils.js"; +import { EndpointPool, withEndpointFailover } from "./endpoint-pool.js"; const DEFAULT_GAS_PRICE = 160_000_000; const DEFAULT_GAS_MULTIPLIER = 1.05; @@ -47,9 +48,11 @@ type InjectiveConfig = { // this use price without leading 0x export class InjectivePriceListener extends ChainPriceListener { + private readonly endpoints: EndpointPool; + constructor( private pythContractAddress: string, - private grpcEndpoint: string, + grpcEndpoints: string | readonly string[], priceItems: PriceItem[], private logger: Logger, config: { @@ -57,6 +60,10 @@ export class InjectivePriceListener extends ChainPriceListener { }, ) { super(config.pollingFrequency, priceItems); + this.endpoints = new EndpointPool( + typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints, + logger, + ); } async getOnChainPriceInfo( @@ -64,14 +71,20 @@ export class InjectivePriceListener extends ChainPriceListener { ): Promise { let priceQueryResponse: PriceQueryResponse; try { - const api = new ChainGrpcWasmApi(this.grpcEndpoint); - const { data } = await api.fetchSmartContractState( - this.pythContractAddress, - Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString("base64"), + priceQueryResponse = await withEndpointFailover( + this.endpoints, + async (endpoint) => { + const api = new ChainGrpcWasmApi(endpoint); + const { data } = await api.fetchSmartContractState( + this.pythContractAddress, + Buffer.from(`{"price_feed":{"id":"${priceId}"}}`).toString( + "base64", + ), + ); + const json = Buffer.from(data).toString(); + return JSON.parse(json) as PriceQueryResponse; + }, ); - - const json = Buffer.from(data).toString(); - priceQueryResponse = JSON.parse(json); } catch (error) { this.logger.error(error, `Polling on-chain price for ${priceId} failed.`); return undefined; @@ -95,11 +108,12 @@ export class InjectivePricePusher implements IPricePusher { private mnemonic: string; private chainConfig: InjectiveConfig; private accounts: Record = {}; + private readonly endpoints: EndpointPool; constructor( private hermesClient: HermesClient, private pythContractAddress: string, - private grpcEndpoint: string, + grpcEndpoints: string | readonly string[], private logger: Logger, mnemonic: string, chainConfig?: Partial, @@ -113,6 +127,10 @@ export class InjectivePricePusher implements IPricePusher { chainConfig?.priceIdsProcessChunkSize ?? DEFAULT_PRICE_IDS_PROCESS_CHUNK_SIZE, }; + this.endpoints = new EndpointPool( + typeof grpcEndpoints === "string" ? [grpcEndpoints] : grpcEndpoints, + logger, + ); } private getWallet(index: number) { @@ -130,13 +148,15 @@ export class InjectivePricePusher implements IPricePusher { msg: Msgs, index: number, ): Promise { - const chainGrpcAuthApi = new ChainGrpcAuthApi(this.grpcEndpoint); const wallet = this.getWallet(index); const injectiveAddress = wallet.toAddress().toBech32(); // Fetch the latest account details only if it's not stored. - this.accounts[injectiveAddress] ??= - await chainGrpcAuthApi.fetchAccount(injectiveAddress); + this.accounts[injectiveAddress] ??= await withEndpointFailover( + this.endpoints, + (endpoint) => + new ChainGrpcAuthApi(endpoint).fetchAccount(injectiveAddress), + ); const account = this.accounts[injectiveAddress]; @@ -157,8 +177,9 @@ export class InjectivePricePusher implements IPricePusher { txRaw.signatures = [sig]; // this takes approx 5 seconds - const txResponse = await new TxGrpcApi(this.grpcEndpoint).broadcast( - txRaw, + const txResponse = await withEndpointFailover( + this.endpoints, + (endpoint) => new TxGrpcApi(endpoint).broadcast(txRaw), ); account.baseAccount.sequence++; @@ -271,8 +292,8 @@ export class InjectivePricePusher implements IPricePusher { }); try { - const result = await new TxGrpcApi(this.grpcEndpoint).simulate( - simulateTxRaw, + const result = await withEndpointFailover(this.endpoints, (endpoint) => + new TxGrpcApi(endpoint).simulate(simulateTxRaw), ); const gas = ( @@ -326,22 +347,24 @@ export class InjectivePricePusher implements IPricePusher { */ private async getUpdateFee(vaas: string[]) { try { - const api = new ChainGrpcWasmApi(this.grpcEndpoint); - const { data } = await api.fetchSmartContractState( - this.pythContractAddress, - Buffer.from( - JSON.stringify({ - get_update_fee: { - vaas, - }, - }), - ).toString("base64"), - ); - - const json = Buffer.from(data).toString(); - - // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return JSON.parse(json); + return await withEndpointFailover(this.endpoints, async (endpoint) => { + const api = new ChainGrpcWasmApi(endpoint); + const { data } = await api.fetchSmartContractState( + this.pythContractAddress, + Buffer.from( + JSON.stringify({ + get_update_fee: { + vaas, + }, + }), + ).toString("base64"), + ); + + const json = Buffer.from(data).toString(); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return JSON.parse(json); + }); } catch (error) { this.logger.error(error, `Error fetching update fee.`); diff --git a/apps/price_pusher/tests/endpoint-pool.test.ts b/apps/price_pusher/tests/endpoint-pool.test.ts new file mode 100644 index 0000000000..62692c82ef --- /dev/null +++ b/apps/price_pusher/tests/endpoint-pool.test.ts @@ -0,0 +1,99 @@ +import { + EndpointPool, + withEndpointFailover, +} from "../src/injective/endpoint-pool.js"; + +const silentLogger = { + trace: () => undefined, + debug: () => undefined, + info: () => undefined, + warn: () => undefined, + error: () => undefined, + fatal: () => undefined, + silent: () => undefined, + level: "silent", + child: () => silentLogger, +} as unknown as Parameters[1] extends never + ? never + : ConstructorParameters[1]; + +describe("EndpointPool", () => { + it("rejects an empty endpoint list", () => { + expect(() => new EndpointPool([], silentLogger)).toThrow( + /at least one endpoint/, + ); + }); + + it("returns the first endpoint by default", () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + expect(pool.current()).toBe("a"); + expect(pool.size()).toBe(3); + }); + + it("rotates round-robin and wraps past the end", () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + pool.rotate(new Error("first failure")); + expect(pool.current()).toBe("b"); + pool.rotate(new Error("second failure")); + expect(pool.current()).toBe("c"); + pool.rotate(new Error("wrap")); + expect(pool.current()).toBe("a"); + }); + + it("treats rotate() as a no-op for a single-endpoint pool", () => { + const pool = new EndpointPool(["only"], silentLogger); + expect(pool.current()).toBe("only"); + pool.rotate(new Error("transient")); + expect(pool.current()).toBe("only"); + }); +}); + +describe("withEndpointFailover", () => { + it("returns the first successful result without rotating", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const seen: string[] = []; + const result = await withEndpointFailover(pool, (endpoint) => { + seen.push(endpoint); + return Promise.resolve(endpoint.toUpperCase()); + }); + expect(result).toBe("A"); + expect(seen).toEqual(["a"]); + expect(pool.current()).toBe("a"); + }); + + it("rotates to the next endpoint on failure and returns its result", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const seen: string[] = []; + const result = await withEndpointFailover(pool, (endpoint) => { + seen.push(endpoint); + if (endpoint === "a") return Promise.reject(new Error("a down")); + return Promise.resolve(endpoint); + }); + expect(result).toBe("b"); + expect(seen).toEqual(["a", "b"]); + expect(pool.current()).toBe("b"); + }); + + it("walks every endpoint at most once before re-throwing", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + const attempts: string[] = []; + await expect( + withEndpointFailover(pool, (endpoint) => { + attempts.push(endpoint); + return Promise.reject(new Error(`${endpoint} down`)); + }), + ).rejects.toThrow(/c down/); + expect(attempts).toEqual(["a", "b", "c"]); + }); + + it("starts from wherever the pool's cursor currently is", async () => { + const pool = new EndpointPool(["a", "b", "c"], silentLogger); + pool.rotate(new Error("prior cycle")); + const attempts: string[] = []; + await withEndpointFailover(pool, (endpoint) => { + attempts.push(endpoint); + return Promise.resolve(endpoint); + }); + expect(attempts).toEqual(["b"]); + }); +}); From d647f8416772196386a7ea72d98f54918adfcff6 Mon Sep 17 00:00:00 2001 From: 0xghost42 Date: Fri, 15 May 2026 11:40:43 +0530 Subject: [PATCH 2/3] chore(price-pusher): bump version to 10.5.0 for multi-endpoint support --- apps/price_pusher/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/price_pusher/package.json b/apps/price_pusher/package.json index 4458e7491b..3bc5476643 100644 --- a/apps/price_pusher/package.json +++ b/apps/price_pusher/package.json @@ -220,5 +220,5 @@ }, "type": "module", "types": "./dist/index.d.ts", - "version": "10.4.0" + "version": "10.5.0" } From d8aadbf92d72a2797ca076c8a358a0a935ce2992 Mon Sep 17 00:00:00 2001 From: 0xghost42 Date: Fri, 15 May 2026 14:24:24 +0530 Subject: [PATCH 3/3] fix(price-pusher): export injective/endpoint-pool to unblock CJS build The exports map drives ts-duality --noEsm CJS compilation. Without an entry for src/injective/endpoint-pool.ts, dist/injective/endpoint-pool.cjs is not generated and dist/injective/injective.cjs fails at runtime when it requires ./endpoint-pool.js. Mirror the pattern used by the other injective files. --- apps/price_pusher/package.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/price_pusher/package.json b/apps/price_pusher/package.json index 3bc5476643..b878df0221 100644 --- a/apps/price_pusher/package.json +++ b/apps/price_pusher/package.json @@ -113,6 +113,10 @@ "default": "./dist/injective/command.cjs", "types": "./dist/injective/command.d.ts" }, + "./injective/endpoint-pool": { + "default": "./dist/injective/endpoint-pool.cjs", + "types": "./dist/injective/endpoint-pool.d.ts" + }, "./injective/injective": { "default": "./dist/injective/injective.cjs", "types": "./dist/injective/injective.d.ts"