-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscanner.py
More file actions
390 lines (245 loc) Β· 10.3 KB
/
scanner.py
File metadata and controls
390 lines (245 loc) Β· 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import discord
import aiohttp
import os
import requests
import json
from discord.ext import commands
from dotenv import load_dotenv
from datetime import datetime, timedelta
# Load environment variables
load_dotenv()
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
HELIUS_API_KEY = os.getenv('HELIUS_API_KEY')
BITQUERY_API_ID = os.getenv('BITQUERY_API_ID')
BITQUERY_API_SECRET = os.getenv('BITQUERY_API_SECRET')
# Initialize bot and define intents
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)
# Access token for Bitquery API
bitquery_access_token = None
bitquery_token_expires_at = None
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord π₯·!')
# Format numbers for readability
def format_number(num):
try:
return "${:,.2f}".format(float(num))
except (ValueError, TypeError):
return "Unavailable"
# Function to obtain Bitquery access token
def get_bitquery_access_token():
global bitquery_access_token, bitquery_token_expires_at
if bitquery_access_token and bitquery_token_expires_at > datetime.utcnow():
return bitquery_access_token
url = "https://oauth2.bitquery.io/oauth2/token"
payload = {
'grant_type': 'client_credentials',
'client_id': BITQUERY_API_ID,
'client_secret': BITQUERY_API_SECRET,
'scope': 'api'
}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
data = response.json()
bitquery_access_token = data["access_token"]
bitquery_token_expires_at = datetime.utcnow() + timedelta(seconds=data.get("expires_in", 3600))
print("BitQuery access token obtained successfully.")
else:
print("Failed to obtain BitQuery access token.")
bitquery_access_token = None
return bitquery_access_token
# Helper function to make GraphQL requests with error handling
def graphql_request(query, variables=None):
access_token = get_bitquery_access_token()
if not access_token:
print("Failed to authenticate with BitQuery.")
return None
url_graphql = "https://streaming.bitquery.io/eap"
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
payload = json.dumps({'query': query, 'variables': variables} if variables else {'query': query})
try:
response = requests.post(url_graphql, headers=headers, data=payload)
response.raise_for_status()
data = response.json()
return data.get("data", None)
except requests.RequestException as e:
print(f"Request error: {e}")
except json.JSONDecodeError:
print("Failed to parse JSON response.")
return None
# Retrieve Helius asset data
async def get_helius_asset_data(contract_address):
url = f"https://mainnet.helius-rpc.com/?api-key={HELIUS_API_KEY}"
payload = {
"jsonrpc": "2.0",
"id": "my-id",
"method": "getAsset",
"params": {
"id": contract_address,
"displayOptions": {
"showFungible": True,
}
}
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
asset_data = await response.json()
return asset_data.get("result", {})
except aiohttp.ClientError as e:
print(f"Error fetching Helius data: {e}")
return None
# Check migration status using BitQuery
def check_migration_status(contract_address):
query = """
query ($token: String) {
Solana(network: solana) {
Instructions(
where: {
Instruction: {
Accounts: { includes: { Address: { is: $token } } },
Program: { Name: { is: "pump" }, Method: { is: "mintTo" } }
}
}
) {
Transaction { Signer }
}
}
}
"""
variables = {"token": contract_address}
result = graphql_request(query, variables)
if result:
instructions = result.get("Solana", {}).get("Instructions", [])
return "Detected" if instructions else "No migration detected."
return "Unavailable"
# Check for Jito Bundle status
def check_jito_bundle(contract_address):
query = """
query ($token: String) {
Solana {
Instructions(
where: {
Instruction: {
Accounts: { includes: { Address: { is: $token } } },
Program: { Name: { is: "jito", Method: { is: "bundle" } } }
}
}
) {
Transaction { Signer }
}
}
}
"""
variables = {"token": contract_address}
result = graphql_request(query, variables)
if result:
instructions = result.get("Solana", {}).get("Instructions", [])
return "Detected" if instructions else "No Jito Bundle detected."
return "Unavailable"
# Check DexScreener for "Dex Paid?" status
async def get_dex_paid_status(chain_id, contract_address):
url = f"https://api.dexscreener.com/orders/v1/{chain_id}/{contract_address}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as response:
if response.status == 200:
orders_data = await response.json()
if isinstance(orders_data, dict) and "orders" in orders_data:
for order in orders_data["orders"]:
if order.get("status") == "approved":
return "Yes"
return "No"
except aiohttp.ClientError as e:
print(f"Error checking Dex Paid status: {e}")
return "N/A"
# Fetch latest trade info (buy/sell/total volume)
async def get_latest_trade_info(token_address):
query = """
query MyQuery {
Solana {
DEXTradeByTokens(
where: {
Trade: { Currency: { MintAddress: { is: "%s" } } }
}
) {
buy_volume: sum(of: Trade_Side_AmountInUSD, if: { Trade: { Side: { Type: { is: buy } } } })
sell_volume: sum(of: Trade_Side_AmountInUSD, if: { Trade: { Side: { Type: { is: sell } } } })
total_trade_volume: sum(of: Trade_Side_AmountInUSD)
}
}
}
""" % token_address
result = graphql_request(query)
if result:
dex_data = result.get("Solana", {}).get("DEXTradeByTokens", [{}])[0]
return {
"buy_volume": format_number(dex_data.get("buy_volume", 0)),
"sell_volume": format_number(dex_data.get("sell_volume", 0)),
"total_trade_volume": format_number(dex_data.get("total_trade_volume", 0))
}
return {"buy_volume": "Unavailable", "sell_volume": "Unavailable", "total_trade_volume": "Unavailable"}
# Main command to fetch token information
@bot.command(name='scan')
async def fetch_token_status(ctx, contract_address: str):
await ctx.send(f"Fetching data for contract address: {contract_address}... π§βπ»")
# Retrieve data from Helius RPC
helius_data = await get_helius_asset_data(contract_address)
# Retrieve "Dex Paid" status
chain_id = "solana"
dex_paid_status = await get_dex_paid_status(chain_id, contract_address)
# Get migration status
migration_status = check_migration_status(contract_address)
# Check for Jito Bundle status
jito_bundle_status = check_jito_bundle(contract_address)
# Get latest trade info
trade_info = await get_latest_trade_info(contract_address)
# Construct the embed with available data
embed = discord.Embed(title=f"Token Info for {contract_address}", color=0x00ff00)
if helius_data:
token_info = helius_data.get("token_info", {})
price_info = token_info.get("price_info", {})
metadata = helius_data.get("content", {}).get("metadata", {})
image_url = helius_data.get("content", {}).get("links", {}).get("image", None)
developer_address = helius_data.get("authorities", [{}])[0].get("address", "N/A")
# Calculate market cap with decimals adjustment
try:
price_per_token = float(price_info.get("price_per_token", 0))
supply = float(token_info.get("supply", 0))
decimals = token_info.get("decimals", 0)
adjusted_supply = supply / (10 ** decimals) if decimals else supply
market_cap = format_number(price_per_token * adjusted_supply) if price_per_token > 0 and adjusted_supply > 0 else "N/A"
except (ValueError, TypeError):
market_cap = "N/A"
# Set embed fields with Helius data
embed.add_field(name="πͺ Name", value=metadata.get("name", "N/A"), inline=True)
embed.add_field(name="π Symbol", value=token_info.get("symbol", "N/A"), inline=True)
embed.add_field(name="π€ Developer", value=f"[{developer_address[:8]}...](https://explorer.solana.com/address/{developer_address})", inline=True)
# Add image if available
if image_url:
embed.set_thumbnail(url=image_url)
else:
embed.add_field(name="Error", value="No data found from Helius.", inline=False)
# Display Dex Paid status
embed.add_field(name="β
Dex Paid?", value=dex_paid_status, inline=True)
# Jito Bundle and Migration Status
embed.add_field(name="π Jito Bundle Status", value=jito_bundle_status, inline=True)
embed.add_field(name="π Migration Status", value=migration_status, inline=True)
# Latest Trade Information
embed.add_field(name="π Latest Trade Information π", value="βββββββββββββββ", inline=False)
embed.add_field(name="πΉ Buy Volume (USD)", value=trade_info['buy_volume'], inline=True)
embed.add_field(name="πΉ Sell Volume (USD)", value=trade_info['sell_volume'], inline=True)
embed.add_field(name="πΉ Total Trade Volume (USD)", value=trade_info['total_trade_volume'], inline=True)
# Market Cap
embed.add_field(name="π¦ Market Cap", value=market_cap, inline=True)
# Send the embed to Discord
await ctx.send(embed=embed)
# Start the bot
bot.run(DISCORD_TOKEN)