From 355801a0843b20a7f37b66dc56bd4d601a6fd9a1 Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Sun, 5 Jan 2025 18:53:38 -0800 Subject: [PATCH 1/2] new example for sub Raydium price --- 09-example-subRaydiumPrice/README.md | 22 ++ 09-example-subRaydiumPrice/index.ts | 290 +++++++++++++++++++++ 09-example-subRaydiumPrice/package.json | 36 +++ 09-example-subRaydiumPrice/utils/logger.ts | 18 ++ 4 files changed, 366 insertions(+) create mode 100644 09-example-subRaydiumPrice/README.md create mode 100644 09-example-subRaydiumPrice/index.ts create mode 100644 09-example-subRaydiumPrice/package.json create mode 100644 09-example-subRaydiumPrice/utils/logger.ts diff --git a/09-example-subRaydiumPrice/README.md b/09-example-subRaydiumPrice/README.md new file mode 100644 index 0000000..cabe58d --- /dev/null +++ b/09-example-subRaydiumPrice/README.md @@ -0,0 +1,22 @@ +# subscribes Raydium price + +This example subscribes to raydiumLiquidityPoolv4 pool transactions based on transactions filter conditions. By monitoring transactions of the raydiumLiquidityPoolv4 account, it obtains price change information for Raydium trading pairs. + +Run using “npm start”, the output should be as follows:: + +```bash +[20:50:09.323] INFO: 3ef48CZQ7nAKKL6WGckFw9mhgXqA3Y9GvfY4qqsZHgKhD4BkchMx3KHL9ygbpJrM8mSi65u4hcfuMWBtDiFNgyjv : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.323] INFO: 31zV68Gnt67ap1fC1SGnv9n7xFTEETwsPCEqfAXJYxmpBYajLVSG1mP1uUTnMDoS2hPJsVbvFdLBXNPc87gtirPx : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.324] INFO: 66aZkhCvs47mVsjmQPKS9TLd81ZKKupyazPGsUz4gUqdzvtCkhyVZ4HH3LGxb51fxqaVnzjMTrdpWzEuRCYaCtqF : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.325] INFO: 2x98ttkvLtaWUyTBZdUFgpkMYFJ7W9AuF6YBrTuB6SG9YMiLS9LoYA1iTV5G8ZWB3LYstJJhRMHGK6tGBM9DEvpZ : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.325] INFO: 5AdiGuDXyzbc6bDXburrvufrch3Q3qEjmSTZNVhCtLJMDZD4tL2H17N5Lbn3Cie482ZD1AynhMwHS5km2RRunriE : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.329] INFO: 5CnHXDcUr6VYu9LDmYQsdfext5LFm53UjgBBvHcxbuFosT3HKVqy8oM4x8yt3g4re7DzPjqAFPe99W8yYke9uiUf : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.329] INFO: 3QtbbN9ts5TcstXgqCmKDe4mD1225CTLdNLc5G6dKVBZGDyQNBduY4jfveqWK3QmmfcWhnMwct72zQxQdypGSjXR : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +^C[20:50:09.330] INFO: 65fE5d83qKJ4sZ9PYeMnerHUTekfF5kTqGoUGn6nWB2NTt3Yb4LggKGLSh2mAQCuchgPgGAAwbMpoKfJUvrKfVp5 : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +``` + +Output data explanation: +First column: signature - Transaction signature +Second column: mint - Trading pair mint +Third column: price for sol + diff --git a/09-example-subRaydiumPrice/index.ts b/09-example-subRaydiumPrice/index.ts new file mode 100644 index 0000000..1335641 --- /dev/null +++ b/09-example-subRaydiumPrice/index.ts @@ -0,0 +1,290 @@ +import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; +import bs58 from "bs58"; +import net from 'net'; +import {Connection, PublicKey, Transaction, SystemProgram} from "@solana/web3.js"; +import logger from "./utils/logger"; + + + // Constants +const RAYDIUM_PROGRAM_ID = new PublicKey("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"); +const SOL_MINT = "So11111111111111111111111111111111111111112"; +const SERUM_PROGRAM_ID = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"; +const RETRY_DELAY = 1000; +const RAYDIUM_AUTHORITY = "5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1"; + +const GRPC_URL = "https://grpc.chainbuff.com"; +const TELNET_PORT = 8888; + + +class RaydiumSwapSubscriber { + private bondingCurveSet: Set; + private stream: any; + private client: Client | null = null; + private server: net.Server; + private pingInterval: NodeJS.Timeout | null = null; + private isCleaningUp = false; + private isReconnecting = false; + + + + constructor() { + } + + + private async reconnect() { + await this.cleanup(); + await this.listen(); + } + + public async cleanup() { + if (this.isCleaningUp) return; + try { + this.isCleaningUp = true; + logger.info('Starting cleanup process...'); + + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + logger.info('Ping interval cleared'); + } + + if (this.stream) { + try { + this.stream.on('error', (error) => { + if (error.code === 1 && error.details === 'Cancelled on client') { + logger.info('Expected cancellation error, safely ignored'); + } else { + logger.error('Stream error during cleanup:', error); + } + }); + + this.stream.removeAllListeners('data'); + this.stream.removeAllListeners('end'); + this.stream.removeAllListeners('close'); + logger.info('Stream listeners removed'); + + this.stream.cancel(); + await new Promise(resolve => setTimeout(resolve, 1000)); + } catch (error) { + logger.error('Error during stream cleanup:', error); + } finally { + this.stream = null; + logger.info('Stream cancelled and cleared'); + } + } + + if (this.client) { + this.client = null; + logger.info('Client reference cleared'); + } + + logger.info('Cleanup completed successfully'); + } finally { + this.isCleaningUp = false; + } + } + + async listen() { + try { + if (this.bondingCurveSet) { + const curveset = Array.from(this.bondingCurveSet); + } + logger.info("Subscribing to event stream with bonding curves:"); + + this.client = new Client(GRPC_URL, undefined, { + "grpc.max_receive_message_length": 10 * 1024 * 1024, // 10MiB + }); + + this.stream = await this.client.subscribe(); + + this.setupStreamListeners(); + + const request: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: { + raydiumLiquidityPoolv4: { + vote: false, + failed: false, + signature: undefined, + accountInclude: [RAYDIUM_PROGRAM_ID.toBase58()], + accountRequired: [], + accountExclude: [] + }, + }, + transactionsStatus: {}, + entry: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + ping: undefined, + commitment: CommitmentLevel.CONFIRMED, + }; + + await new Promise((resolve, reject) => { + this.stream.write(request, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }); + + const pingRequest: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: {}, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + entry: {}, + accountsDataSlice: [], + commitment: undefined, + ping: { id: 1 }, + }; + + this.pingInterval = setInterval(async () => { + try { + if (this.stream) { + await new Promise((resolve, reject) => { + this.stream.write(pingRequest, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }); + } + } catch (error) { + logger.error('Ping error:', error); + this.handleStreamError(); + } + }, 5000); + + } catch (error) { + logger.error('Error in listen:', error); + throw error; + } + } + + private setupStreamListeners() { + if (!this.stream) return; + + this.stream.on('error', (error) => { + logger.error('Stream error:', error); + this.handleStreamError(); + }); + + this.stream.on('end', () => { + logger.info('Stream ended'); + this.handleStreamEnd(); + }); + + this.stream.on('close', () => { + logger.info('Stream closed'); + this.handleStreamClose(); + }); + + this.stream.on("data", async (data) => { + if (data?.transaction) { + this.checkPoolPrice(data.transaction); + } + }); + } + + private async handleStreamError() { + if (this.isCleaningUp || this.isReconnecting) return; + + try { + this.isReconnecting = true; + logger.info('Attempting to reconnect due to stream error...'); + await this.cleanup(); + + setTimeout(async () => { + try { + await this.listen(); + logger.info('Successfully reconnected'); + } catch (error) { + logger.error('Failed to reconnect:', error); + // 重置状态并重试 + this.isReconnecting = false; + this.handleStreamError(); + } + }, 5000); + } finally { + this.isReconnecting = false; + } + } + + private async handleStreamEnd() { + if (this.isCleaningUp || this.isReconnecting) return; + logger.info('Stream ended, checking if reconnection needed...'); + + if (this.client) { + await this.handleStreamError(); + } + } + + private async handleStreamClose() { + if (this.isCleaningUp || this.isReconnecting) return; + logger.info('Stream closed'); + + if (this.client) { + this.client = null; + } + } + + checkPoolPrice(txn: any) { + + const transaction = txn?.transaction; + if (!transaction) { + return; + } + const signature = bs58.encode(transaction.signature); + + const preTokenBalances = transaction.meta.preTokenBalances; + const postTokenBalances = transaction.meta.postTokenBalances; + let targetToken = "", postPoolSOL=0, postPoolToken=0, prePoolSOL=0, prePoolToken=0, side = ""; + for (const account of preTokenBalances) { + if (targetToken !== "" && prePoolSOL !== 0 && prePoolToken!==0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + prePoolSOL = account.uiTokenAmount.uiAmount; + } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + prePoolToken = account.uiTokenAmount.uiAmount; + } + } + for (const account of postTokenBalances) { + if (postPoolSOL !== 0 && postPoolToken!==0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT ) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + postPoolSOL = account.uiTokenAmount.uiAmount; + } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + postPoolToken = account.uiTokenAmount.uiAmount; + } + } + if (targetToken === "") return; + logger.info(`${signature} : ${targetToken} : ${postPoolSOL/postPoolToken}`) + + + } + +} + +async function main() { + const subscriber = new RaydiumSwapSubscriber(); + + process.on('SIGINT', async () => { + logger.info('Received SIGINT. Cleaning up...'); + await subscriber.cleanup(); + process.exit(0); + }); + + await subscriber.listen(); +} + +main().catch(console.error); \ No newline at end of file diff --git a/09-example-subRaydiumPrice/package.json b/09-example-subRaydiumPrice/package.json new file mode 100644 index 0000000..2aa985f --- /dev/null +++ b/09-example-subRaydiumPrice/package.json @@ -0,0 +1,36 @@ +{ + "name": "grpc-client-raydium-price-example-ts", + "version": "2.0.1", + "license": "Apache-2.0", + "author": "Triton One", + "main": "dist/client.js", + "description": "Yellowstone gRPC Geyser TypeScript example for raydium price monitor", + "homepage": "https://triton.one", + "dependencies": { + "@coral-xyz/anchor": "^0.29.0", + "@metaplex-foundation/mpl-token-metadata": "^3.3.0", + "@raydium-io/raydium-sdk": "^1.3.1-beta.58", + "@solana-developers/helpers": "^2.5.6", + "@solana/rpc-types": "^2.0.0", + "@solana/spl-token": "^0.4.9", + "@solana/web3.js": "^1.98.0", + "@triton-one/yellowstone-grpc": "^1.4.1", + "@types/bn.js": "^5.1.6", + "axios": "^1.7.9", + "bn.js": "^5.2.1", + "bs58": "^6.0.0", + "pino": "^9.6.0", + "pino-pretty": "^13.0.0", + "typescript": "^5.3.3", + "yargs": "^17.6.2" + }, + "scripts": { + "build": "tsc -p .", + "fmt": "prettier -w .", + "start": "npm run build && node dist/client.js" + }, + "devDependencies": { + "prettier": "^2.8.3", + "typescript": "^5.3.3" + } +} diff --git a/09-example-subRaydiumPrice/utils/logger.ts b/09-example-subRaydiumPrice/utils/logger.ts new file mode 100644 index 0000000..1124804 --- /dev/null +++ b/09-example-subRaydiumPrice/utils/logger.ts @@ -0,0 +1,18 @@ +import pino from "pino"; + + +const stream = pino.multistream([pino.destination("./logs/log.log"), process.stdout]); +const logger = pino( + { + transport: { + target: "pino-pretty", + options: { + ignore: "pid,hostname", + }, + }, + base: null, + }, + stream); + + +export default logger; \ No newline at end of file From 6508ec802210b70ea2cf0d02d2bae3f5a55574b5 Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Mon, 27 Jan 2025 11:10:09 -0800 Subject: [PATCH 2/2] Modify the code to simple. Delete unused files --- 09-example-subRaydiumPrice/README.md | 3 +- 09-example-subRaydiumPrice/index.ts | 380 ++++++--------------- 09-example-subRaydiumPrice/package.json | 36 -- 09-example-subRaydiumPrice/utils/logger.ts | 18 - 4 files changed, 112 insertions(+), 325 deletions(-) delete mode 100644 09-example-subRaydiumPrice/package.json delete mode 100644 09-example-subRaydiumPrice/utils/logger.ts diff --git a/09-example-subRaydiumPrice/README.md b/09-example-subRaydiumPrice/README.md index cabe58d..4b25a25 100644 --- a/09-example-subRaydiumPrice/README.md +++ b/09-example-subRaydiumPrice/README.md @@ -18,5 +18,4 @@ Run using “npm start”, the output should be as follows:: Output data explanation: First column: signature - Transaction signature Second column: mint - Trading pair mint -Third column: price for sol - +Third column: price for sol \ No newline at end of file diff --git a/09-example-subRaydiumPrice/index.ts b/09-example-subRaydiumPrice/index.ts index 1335641..26f5379 100644 --- a/09-example-subRaydiumPrice/index.ts +++ b/09-example-subRaydiumPrice/index.ts @@ -1,290 +1,132 @@ import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; +import "dotenv/config"; import bs58 from "bs58"; -import net from 'net'; -import {Connection, PublicKey, Transaction, SystemProgram} from "@solana/web3.js"; -import logger from "./utils/logger"; - - // Constants -const RAYDIUM_PROGRAM_ID = new PublicKey("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"); +const RAYDIUM_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; const SOL_MINT = "So11111111111111111111111111111111111111112"; -const SERUM_PROGRAM_ID = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"; -const RETRY_DELAY = 1000; const RAYDIUM_AUTHORITY = "5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1"; -const GRPC_URL = "https://grpc.chainbuff.com"; -const TELNET_PORT = 8888; - - -class RaydiumSwapSubscriber { - private bondingCurveSet: Set; - private stream: any; - private client: Client | null = null; - private server: net.Server; - private pingInterval: NodeJS.Timeout | null = null; - private isCleaningUp = false; - private isReconnecting = false; - - - - constructor() { - } - - - private async reconnect() { - await this.cleanup(); - await this.listen(); - } - - public async cleanup() { - if (this.isCleaningUp) return; - try { - this.isCleaningUp = true; - logger.info('Starting cleanup process...'); - - if (this.pingInterval) { - clearInterval(this.pingInterval); - this.pingInterval = null; - logger.info('Ping interval cleared'); - } - - if (this.stream) { - try { - this.stream.on('error', (error) => { - if (error.code === 1 && error.details === 'Cancelled on client') { - logger.info('Expected cancellation error, safely ignored'); - } else { - logger.error('Stream error during cleanup:', error); - } - }); - - this.stream.removeAllListeners('data'); - this.stream.removeAllListeners('end'); - this.stream.removeAllListeners('close'); - logger.info('Stream listeners removed'); +async function main() { - this.stream.cancel(); - await new Promise(resolve => setTimeout(resolve, 1000)); - } catch (error) { - logger.error('Error during stream cleanup:', error); - } finally { - this.stream = null; - logger.info('Stream cancelled and cleared'); - } + // 创建订阅客户端 + // const client = new Client( + // 如遇到TypeError: Client is not a constructor错误 + // 请使用以下方式创建 + // 见 https://github.com/rpcpool/yellowstone-grpc/issues/428 + // @ts-ignore + const client = new Client.default( + "https://test-grpc.chainbuff.com", + undefined, + { + "grpc.max_receive_message_length": 16 * 1024 * 1024, // 16MB + } + ); + + // 创建订阅数据流 + const stream = await client.subscribe(); + + // 创建订阅请求 + const request: SubscribeRequest = { + slots: {}, + accounts: {}, + transactions: { + transactionsSubKey: { + accountInclude: [RAYDIUM_PROGRAM_ID], + accountExclude: [], + accountRequired: [] } - - if (this.client) { - this.client = null; - logger.info('Client reference cleared'); + }, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + entry: {}, + commitment: CommitmentLevel.CONFIRMED + }; + + // 发送订阅请求 + await new Promise((resolve, reject) => { + stream.write(request, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); } + }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); - logger.info('Cleanup completed successfully'); - } finally { - this.isCleaningUp = false; - } - } + // 获取订阅数据 + stream.on("data", async (data) => { + if (data?.transaction) { - async listen() { - try { - if (this.bondingCurveSet) { - const curveset = Array.from(this.bondingCurveSet); + const transaction = data.transaction.transaction; + if (!transaction) { + return; } - logger.info("Subscribing to event stream with bonding curves:"); - - this.client = new Client(GRPC_URL, undefined, { - "grpc.max_receive_message_length": 10 * 1024 * 1024, // 10MiB - }); - - this.stream = await this.client.subscribe(); - - this.setupStreamListeners(); - - const request: SubscribeRequest = { - accounts: {}, - slots: {}, - transactions: { - raydiumLiquidityPoolv4: { - vote: false, - failed: false, - signature: undefined, - accountInclude: [RAYDIUM_PROGRAM_ID.toBase58()], - accountRequired: [], - accountExclude: [] - }, - }, - transactionsStatus: {}, - entry: {}, - blocks: {}, - blocksMeta: {}, - accountsDataSlice: [], - ping: undefined, - commitment: CommitmentLevel.CONFIRMED, - }; - - await new Promise((resolve, reject) => { - this.stream.write(request, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }); - - const pingRequest: SubscribeRequest = { - accounts: {}, - slots: {}, - transactions: {}, - transactionsStatus: {}, - blocks: {}, - blocksMeta: {}, - entry: {}, - accountsDataSlice: [], - commitment: undefined, - ping: { id: 1 }, - }; - - this.pingInterval = setInterval(async () => { - try { - if (this.stream) { - await new Promise((resolve, reject) => { - this.stream.write(pingRequest, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }); - } - } catch (error) { - logger.error('Ping error:', error); - this.handleStreamError(); + const signature = bs58.encode(transaction.signature); + + const preTokenBalances = transaction.meta.preTokenBalances; + const postTokenBalances = transaction.meta.postTokenBalances; + let targetToken = "", postPoolSOL = 0, postPoolToken = 0, prePoolSOL = 0, prePoolToken = 0, side = ""; + for (const account of preTokenBalances) { + if (targetToken !== "" && prePoolSOL !== 0 && prePoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + prePoolSOL = account.uiTokenAmount.uiAmount; + } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + prePoolToken = account.uiTokenAmount.uiAmount; } - }, 5000); - - } catch (error) { - logger.error('Error in listen:', error); - throw error; - } - } - - private setupStreamListeners() { - if (!this.stream) return; - - this.stream.on('error', (error) => { - logger.error('Stream error:', error); - this.handleStreamError(); - }); - - this.stream.on('end', () => { - logger.info('Stream ended'); - this.handleStreamEnd(); - }); - - this.stream.on('close', () => { - logger.info('Stream closed'); - this.handleStreamClose(); - }); - - this.stream.on("data", async (data) => { - if (data?.transaction) { - this.checkPoolPrice(data.transaction); } - }); - } - - private async handleStreamError() { - if (this.isCleaningUp || this.isReconnecting) return; - - try { - this.isReconnecting = true; - logger.info('Attempting to reconnect due to stream error...'); - await this.cleanup(); - - setTimeout(async () => { - try { - await this.listen(); - logger.info('Successfully reconnected'); - } catch (error) { - logger.error('Failed to reconnect:', error); - // 重置状态并重试 - this.isReconnecting = false; - this.handleStreamError(); + for (const account of postTokenBalances) { + if (postPoolSOL !== 0 && postPoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + postPoolSOL = account.uiTokenAmount.uiAmount; } - }, 5000); - } finally { - this.isReconnecting = false; - } - } - - private async handleStreamEnd() { - if (this.isCleaningUp || this.isReconnecting) return; - logger.info('Stream ended, checking if reconnection needed...'); - - if (this.client) { - await this.handleStreamError(); - } - } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + postPoolToken = account.uiTokenAmount.uiAmount; + } + } + if (targetToken === "") return; + console.info(`${signature} : ${targetToken} : ${postPoolSOL / postPoolToken}`) - private async handleStreamClose() { - if (this.isCleaningUp || this.isReconnecting) return; - logger.info('Stream closed'); - - if (this.client) { - this.client = null; } - } - - checkPoolPrice(txn: any) { - - const transaction = txn?.transaction; - if (!transaction) { - return; - } - const signature = bs58.encode(transaction.signature); + }); - const preTokenBalances = transaction.meta.preTokenBalances; - const postTokenBalances = transaction.meta.postTokenBalances; - let targetToken = "", postPoolSOL=0, postPoolToken=0, prePoolSOL=0, prePoolToken=0, side = ""; - for (const account of preTokenBalances) { - if (targetToken !== "" && prePoolSOL !== 0 && prePoolToken!==0) break; // make sure we get the target token and pool sol balances and trader address only - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; - if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { - prePoolSOL = account.uiTokenAmount.uiAmount; - } - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { - prePoolToken = account.uiTokenAmount.uiAmount; - } - } - for (const account of postTokenBalances) { - if (postPoolSOL !== 0 && postPoolToken!==0) break; // make sure we get the target token and pool sol balances and trader address only - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT ) targetToken = account.mint; - if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { - postPoolSOL = account.uiTokenAmount.uiAmount; - } - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { - postPoolToken = account.uiTokenAmount.uiAmount; - } - } - if (targetToken === "") return; - logger.info(`${signature} : ${targetToken} : ${postPoolSOL/postPoolToken}`) - - - } - + // 为保证连接稳定,需要定期向服务端发送ping请求以维持连接 + const pingRequest: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: {}, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + entry: {}, + accountsDataSlice: [], + commitment: undefined, + ping: { id: 1 }, + }; + // 每5秒发送一次ping请求 + setInterval(async () => { + await new Promise((resolve, reject) => { + stream.write(pingRequest, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); + }, 5000); } -async function main() { - const subscriber = new RaydiumSwapSubscriber(); - - process.on('SIGINT', async () => { - logger.info('Received SIGINT. Cleaning up...'); - await subscriber.cleanup(); - process.exit(0); - }); - - await subscriber.listen(); -} -main().catch(console.error); \ No newline at end of file + +main(); \ No newline at end of file diff --git a/09-example-subRaydiumPrice/package.json b/09-example-subRaydiumPrice/package.json deleted file mode 100644 index 2aa985f..0000000 --- a/09-example-subRaydiumPrice/package.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "grpc-client-raydium-price-example-ts", - "version": "2.0.1", - "license": "Apache-2.0", - "author": "Triton One", - "main": "dist/client.js", - "description": "Yellowstone gRPC Geyser TypeScript example for raydium price monitor", - "homepage": "https://triton.one", - "dependencies": { - "@coral-xyz/anchor": "^0.29.0", - "@metaplex-foundation/mpl-token-metadata": "^3.3.0", - "@raydium-io/raydium-sdk": "^1.3.1-beta.58", - "@solana-developers/helpers": "^2.5.6", - "@solana/rpc-types": "^2.0.0", - "@solana/spl-token": "^0.4.9", - "@solana/web3.js": "^1.98.0", - "@triton-one/yellowstone-grpc": "^1.4.1", - "@types/bn.js": "^5.1.6", - "axios": "^1.7.9", - "bn.js": "^5.2.1", - "bs58": "^6.0.0", - "pino": "^9.6.0", - "pino-pretty": "^13.0.0", - "typescript": "^5.3.3", - "yargs": "^17.6.2" - }, - "scripts": { - "build": "tsc -p .", - "fmt": "prettier -w .", - "start": "npm run build && node dist/client.js" - }, - "devDependencies": { - "prettier": "^2.8.3", - "typescript": "^5.3.3" - } -} diff --git a/09-example-subRaydiumPrice/utils/logger.ts b/09-example-subRaydiumPrice/utils/logger.ts deleted file mode 100644 index 1124804..0000000 --- a/09-example-subRaydiumPrice/utils/logger.ts +++ /dev/null @@ -1,18 +0,0 @@ -import pino from "pino"; - - -const stream = pino.multistream([pino.destination("./logs/log.log"), process.stdout]); -const logger = pino( - { - transport: { - target: "pino-pretty", - options: { - ignore: "pid,hostname", - }, - }, - base: null, - }, - stream); - - -export default logger; \ No newline at end of file