forked from magnaopus1/Synthron-Crypto-Trader
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
1928 lines (1679 loc) · 92.5 KB
/
main.py
File metadata and controls
1928 lines (1679 loc) · 92.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
SUPERTRADEX using SYNTHRON CRYPTO TRADER codebase
"""
import sys
import asyncio
import pandas as pd
import time
import logging
import os
import json
from pathlib import Path
from datetime import datetime, timezone
import signal
import httpx
from dotenv import load_dotenv, dotenv_values
import io
from solana.rpc.async_api import AsyncClient
from logging.handlers import RotatingFileHandler
from typing import List, Optional, Type, Dict, Any
import aiohttp
import queue
import threading
from config.settings import EncryptionSettings
from sqlalchemy import select, desc, asc
from fastapi import FastAPI
import json
import websockets
import threading
from typing import Set
from dataclasses import dataclass, field
class RealtimeLogHandler(logging.Handler):
"""Custom log handler that broadcasts log entries to WebSocket clients"""
def __init__(self, realtime_manager):
super().__init__()
self.realtime_manager = realtime_manager
self.log_queue = queue.Queue()
self.broadcast_thread = None
self.start_broadcast_thread()
def start_broadcast_thread(self):
"""Start background thread to broadcast queued log entries"""
def broadcast_worker():
while True:
try:
log_entry = self.log_queue.get(timeout=1)
if log_entry:
# Broadcast asynchronously
asyncio.create_task(self.realtime_manager.broadcast_log_update(log_entry))
self.log_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in log broadcast thread: {e}")
self.broadcast_thread = threading.Thread(target=broadcast_worker, daemon=True)
self.broadcast_thread.start()
def emit(self, record):
"""Emit a log record - queue it for broadcasting"""
try:
# Format the log entry
log_entry = {
"timestamp": time.time(),
"level": record.levelname,
"message": self.format(record),
"logger": record.name
}
# Queue for broadcasting (non-blocking)
try:
self.log_queue.put_nowait(log_entry)
except queue.Full:
# If queue is full, drop the oldest entry
try:
self.log_queue.get_nowait()
self.log_queue.put_nowait(log_entry)
except queue.Empty:
pass
except Exception:
self.handleError(record)
@dataclass
class RealtimeEventManager:
"""Manages real-time event broadcasting to web clients"""
connected_clients: Set[websockets.WebSocketServerProtocol] = field(default_factory=set)
websocket_server = None
server_thread = None
async def start_websocket_server(self, host: str = "127.0.0.1", port: int = 8765):
"""Start the WebSocket server for real-time updates"""
logger.info(f"🚀 Starting real-time WebSocket server on {host}:{port}")
async def websocket_handler(websocket, path):
"""Handle new WebSocket connections"""
self.connected_clients.add(websocket)
logger.info(f"📡 WebSocket client connected. Total clients: {len(self.connected_clients)}")
try:
await websocket.send(json.dumps({
"event": "connected",
"type": "system",
"data": {"message": "Connected to SuperTradeX real-time updates"},
"timestamp": time.time()
}))
# Keep connection alive
async for message in websocket:
# Handle client messages if needed
try:
data = json.loads(message)
if data.get("type") == "ping":
await websocket.send(json.dumps({
"event": "pong",
"type": "system",
"timestamp": time.time()
}))
except json.JSONDecodeError:
pass
except websockets.exceptions.ConnectionClosed:
pass
finally:
self.connected_clients.discard(websocket)
logger.info(f"📡 WebSocket client disconnected. Total clients: {len(self.connected_clients)}")
# Start server in background thread
async def run_server():
self.websocket_server = await websockets.serve(
websocket_handler,
host,
port,
ping_interval=20,
ping_timeout=10
)
logger.info(f"✅ WebSocket server started on ws://{host}:{port}")
await self.websocket_server.wait_closed()
# Run server in thread to avoid blocking
def run_in_thread():
asyncio.new_event_loop().run_until_complete(run_server())
self.server_thread = threading.Thread(target=run_in_thread, daemon=True)
self.server_thread.start()
async def stop_websocket_server(self):
"""Stop the WebSocket server"""
if self.websocket_server:
self.websocket_server.close()
await self.websocket_server.wait_closed()
logger.info("🛑 WebSocket server stopped")
async def broadcast_event(self, event_type: str, event_data: dict):
"""Broadcast an event to all connected WebSocket clients"""
if not self.connected_clients:
return
message = {
"event": event_type,
"type": "realtime_update",
"data": event_data,
"timestamp": time.time()
}
message_str = json.dumps(message)
disconnected_clients = set()
for client in self.connected_clients.copy():
try:
await client.send(message_str)
except websockets.exceptions.ConnectionClosed:
disconnected_clients.add(client)
except Exception as e:
logger.error(f"Error broadcasting to WebSocket client: {e}")
disconnected_clients.add(client)
# Remove disconnected clients
for client in disconnected_clients:
self.connected_clients.discard(client)
if self.connected_clients:
logger.debug(f"📡 Broadcast '{event_type}' to {len(self.connected_clients)} clients")
async def broadcast_token_update(self, token_data: dict):
"""Broadcast token price/monitoring updates"""
await self.broadcast_event("token_update", token_data)
async def broadcast_trade_update(self, trade_data: dict):
"""Broadcast trade execution updates"""
await self.broadcast_event("trade_update", trade_data)
async def broadcast_log_update(self, log_entry: dict):
"""Broadcast new log entries"""
await self.broadcast_event("log_update", log_entry)
async def broadcast_system_status(self, status_data: dict):
"""Broadcast system status updates"""
await self.broadcast_event("system_status", status_data)
async def broadcast_paper_trading_update(self, paper_data: dict):
"""Broadcast paper trading updates"""
await self.broadcast_event("paper_trading", paper_data)
# Add project root to sys.path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# --- Centralized Imports (Grouped) ---
# Configuration
from config import Settings
from config.logging_config import LoggingConfig
from config.solana_config import SolanaConfig
from config.dexscreener_api import DexScreenerAPI
from config.thresholds import Thresholds
from config.filters_config import FiltersConfig
# Utilities
from utils.encryption import decrypt_env_file, test_encryption, get_encryption_password
from utils.circuit_breaker import CircuitBreaker
from utils.proxy_manager import ProxyManager
from utils.helpers import ensure_directory_exists, setup_output_dirs
from utils import get_logger, get_git_commit_hash
from utils.logger import get_logger
# Data Components
# Import DataPackage first
from data import DataPackage
from data.market_data import MarketData
# Then import individual components needed for external initialization
from data.token_database import TokenDatabase, Token
from data.price_monitor import PriceMonitor
from data.monitoring import VolumeMonitor, Monitoring # Import Monitoring here if needed externally
from data.blockchain_listener import BlockchainListener # Add import for BlockchainListener
# MonitoringSimple is no longer needed, we're using MarketData instead
# from data.monitoring_simple import MonitoringSimple # Import our simplified monitoring class
# No need to import components initialized *inside* DataPackage unless used directly elsewhere
# Import TokenMetrics
from data.token_metrics import TokenMetrics # Added import
from data.platform_tracker import PlatformTracker # Added import for PlatformTracker
from data.data_processing import DataProcessing
from data.data_fetcher import DataFetcher
from data.delta_calculator import DeltaCalculator
from data.indicators import Indicators # Added this import
from data.token_scanner import TokenScanner # Added import for TokenScanner
# Filter Components
from filters.whitelist import Whitelist
from filters.blacklist import Blacklist # Added import
from filters import FilterManager # Import manager
# Import components needed for FilterManager initialization
from filters.twitter_check import TwitterCheck
# No need to import individual filters if only used via FilterManager config
# API Clients (Specific Examples)
from config.rugcheck_api import RugcheckAPI
from filters.solsniffer_api import SolsnifferAPI
from data.solanatracker_api import SolanaTrackerAPI
# Execution Components
from execution.trade_queue import TradeQueue
from execution.order_manager import OrderManager
from execution.transaction_tracker import TransactionTracker
from execution.trade_scheduler import TradeScheduler, TradeTrigger, TriggerType # Added Trigger imports
# Strategy Components
from strategies.entry_exit import EntryExitStrategy
from strategies.risk_management import RiskManagement, TokenRiskMonitor
from strategies.position_management import PositionManagement
from strategies.alert_system import AlertSystem
from strategies.paper_trading import PaperTrading
# Add StrategySelector import
from strategies.strategy_selector import StrategySelector
from strategies import StrategyEvaluator # ADDED - to import the correct one
# Wallet Components
from wallet.wallet_manager import WalletManager
from wallet.balance_checker import BalanceChecker
from wallet.trade_validator import TradeValidator
# Import additional components needed for focused monitoring
from strategies.paper_trading import PaperTrading
from data.blockchain_listener import BlockchainListener
from data.hybrid_monitoring_manager import HybridMonitoringManager, TokenPriority
from data.focused_monitoring import FocusedMonitoringManager, FocusedTokenData
# FocusedTokenData and FocusedMonitoringManager have been moved to data.focused_monitoring
# Define helper function for updating env vars BEFORE it's called
def update_dotenv_vars(env_vars: dict, override: bool = False) -> None:
"""Update os.environ with the given environment variables."""
# Check if the logger is already configured, otherwise print
global logger # Access the globally defined logger
log_func = logger.debug if logger and logger.hasHandlers() else print
updated_count = 0
skipped_count = 0
for key, value in env_vars.items():
if value is None: # Skip if value is None
log_func(f"Skipping None value for env var: {key}")
skipped_count += 1
continue
value_str = str(value) # Ensure value is string
if override or key not in os.environ:
os.environ[key] = value_str
# Mask sensitive keys before logging
log_value = value_str[:3] + '...' + value_str[-3:] if len(value_str) > 6 and ('KEY' in key.upper() or 'SECRET' in key.upper() or 'PASSWORD' in key.upper()) else value_str
log_func(f"Set env var: {key}={log_value}")
updated_count += 1
else:
log_func(f"Skipped env var (already exists, override=False): {key}")
skipped_count += 1
log_func(f"Env var update: {updated_count} set, {skipped_count} skipped.")
# Initialize logger early for startup messages
logger = logging.getLogger(__name__)
# --- Constants ---
ENV_DIR = project_root / "config"
ENV_PLAIN_PATH = ENV_DIR / ".env"
ENV_ENCRYPTED_PATH = ENV_DIR / ".env.encrypted"
# Maximum runtime in seconds (e.g., 24 hours). None for indefinite.
MAX_RUNTIME_SECONDS = None
# --- Environment Variable Loading Logic ---
print("--- Loading Environment Variables --- ")
variables_loaded_count = 0
# 1. Try getting encryption password
password = None
# Load the key-file path from our config layer
key_settings = EncryptionSettings()
key_file_to_use = key_settings.ENCRYPTION_KEY_PATH
print(f"INFO: Using key filename for password retrieval: {key_file_to_use}")
try:
password = get_encryption_password()
if password:
print("INFO: Successfully retrieved encryption password from stored key.")
else:
# Fallback to environment variable if stored password retrieval failed
password = os.getenv("ENCRYPTION_PASSWORD")
if password:
print("INFO: Using encryption password from ENCRYPTION_PASSWORD environment variable.")
else:
print("INFO: No stored or environment encryption password found.")
except Exception as e:
print(f"ERROR: Could not retrieve encryption password: {e}")
# 2. Skip encrypted file for now (using plain .env file)
if ENV_ENCRYPTED_PATH.exists():
print(f"INFO: Skipping encrypted environment file: {ENV_ENCRYPTED_PATH} (using plain .env)")
else:
print(f"INFO: Encrypted environment file not found: {ENV_ENCRYPTED_PATH}")
# 3. Load plain .env file (overrides anything loaded from encrypted file)
if ENV_PLAIN_PATH.exists():
print(f"INFO: Loading plain environment file (with override): {ENV_PLAIN_PATH}")
try:
loaded_plain_count = load_dotenv(dotenv_path=ENV_PLAIN_PATH, override=True)
if loaded_plain_count:
# Only increment count if encrypted wasn't loaded
if variables_loaded_count == 0:
variables_loaded_count += 1
print(f"INFO: Successfully loaded/overridden variables from plain file: {ENV_PLAIN_PATH}")
else:
print(f"INFO: Plain file {ENV_PLAIN_PATH} loaded, but encrypted vars took precedence (override=True still applies).")
else:
print(f"WARNING: Plain environment file {ENV_PLAIN_PATH} exists but failed to load/override any variables.")
except Exception as e:
print(f"ERROR: Failed loading plain env file {ENV_PLAIN_PATH}: {e}")
else:
print(f"INFO: Plain environment file not found: {ENV_PLAIN_PATH}")
if variables_loaded_count == 0:
print("WARNING: No environment variables loaded from config/.env or config/.env.encrypted. Relying on system environment.")
print("--- Environment Variable Loading Complete --- ")
# --- Settings and Logging Setup (Initialize AFTER loading ALL env vars) ---
try:
settings = Settings()
LoggingConfig.setup_logging(settings=settings)
logger = logging.getLogger(__name__) # Re-get logger after setup
# Setup specialized loggers for prices and trades
from config.logging_config import setup_specialized_loggers
price_logger, trade_logger = setup_specialized_loggers()
logger.info("📊 Specialized loggers initialized: prices.log and trades.log")
# Add real-time log handler to the root logger (only if realtime_manager is available)
if 'realtime_manager' in locals() and realtime_manager:
realtime_log_handler = RealtimeLogHandler(realtime_manager)
realtime_log_handler.setLevel(logging.INFO)
realtime_log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(realtime_log_handler)
logger.info("🔄 Real-time log broadcasting enabled")
# --- Explicitly set websockets logger level to INFO ---
websockets_logger = logging.getLogger('websockets')
websockets_logger.setLevel(logging.INFO)
# Optional: Ensure handlers also respect this level if they have their own level set higher
# for handler in websockets_logger.handlers:
# handler.setLevel(logging.INFO)
# # Propagate setting to root handlers if no specific handlers are attached
# if not websockets_logger.handlers:
# for handler in logging.getLogger().handlers:
# # Be careful not to lower the level of handlers meant for higher levels (e.g., ERROR file handler)
# # This part might be unnecessary if root handlers are already at INFO or DEBUG
# pass # Usually inheriting from root is sufficient
logger.info("Websockets logger level forced to INFO.") # Add log message
# -----------------------------------------------------
# --- REMOVED Explicit TokenScanner Debug Setting --- ## REMOVED ##
# token_scanner_logger = logging.getLogger('data.token_scanner')
# token_scanner_logger.setLevel(logging.DEBUG)
# --- Added Debug Setting for MarketData --- #
logging.getLogger('data.market_data').setLevel(logging.DEBUG)
logger.info("Logging configured successfully.")
logger.info("Settings loaded and Logging configured.")
except ValueError as e:
initial_logger = logging.getLogger("startup_error")
initial_logger.error(f"CRITICAL ERROR: Missing required environment variable: {e}. Check .env files. Exiting.")
print("--- Loaded Environment Variables (at time of error) ---")
debug_vars = {k: (v[:3] + '...' + v[-3:] if isinstance(v, str) and len(v)>6 and ('KEY' in k or 'SECRET' in k or 'PASSWORD' in k) else v) for k, v in os.environ.items()}
print(json.dumps(debug_vars, indent=2))
print("--- End Loaded Environment Variables --- ")
sys.exit(1)
except Exception as e:
initial_logger = logging.getLogger("startup_error")
initial_logger.error(f"CRITICAL ERROR during Settings initialization: {e}. Exiting.", exc_info=True)
sys.exit(1)
# --- Helper Function for Graceful Shutdown ---
async def close_all_components(components_dict: dict):
"""Safely closes components listed in the dictionary."""
logger.info("--- Closing All Components ---")
# Close in reverse order of initialization or based on dependencies
# Close StrategyEvaluator first if it depends on MarketData, DB etc.
strategy_evaluator = components_dict.get("strategy_evaluator")
if strategy_evaluator and hasattr(strategy_evaluator, 'close') and callable(getattr(strategy_evaluator, 'close')):
try:
logger.info("Closing StrategyEvaluator...")
await strategy_evaluator.close()
logger.info("StrategyEvaluator closed.")
except Exception as e:
logger.error(f"Error closing StrategyEvaluator: {e}", exc_info=True)
# Close TradeExecutor
trade_executor = components_dict.get("trade_executor")
if trade_executor and hasattr(trade_executor, 'close') and callable(getattr(trade_executor, 'close')):
try:
logger.info("Closing TradeExecutor...")
await trade_executor.close() # Assuming it has an async close
logger.info("TradeExecutor closed.")
except Exception as e:
logger.error(f"Error closing TradeExecutor: {e}", exc_info=True)
# Close Realtime Manager
realtime_manager = components_dict.get("realtime_manager")
if realtime_manager and hasattr(realtime_manager, 'stop_websocket_server') and callable(getattr(realtime_manager, 'stop_websocket_server')):
try:
logger.info("Stopping RealtimeEventManager...")
await realtime_manager.stop_websocket_server()
logger.info("RealtimeEventManager stopped.")
except Exception as e:
logger.error(f"Error stopping RealtimeEventManager: {e}", exc_info=True)
# Close TokenScanner
token_scanner = components_dict.get("token_scanner")
if token_scanner and hasattr(token_scanner, 'close') and callable(getattr(token_scanner, 'close')):
try:
logger.info("Closing TokenScanner...")
await token_scanner.close()
logger.info("TokenScanner closed.")
except Exception as e:
logger.error(f"Error closing TokenScanner: {e}", exc_info=True)
# Close MarketData
market_data = components_dict.get("market_data")
if market_data and hasattr(market_data, 'close') and callable(getattr(market_data, 'close')):
try:
logger.info("Closing MarketData...")
await market_data.close()
logger.info("MarketData closed.")
except Exception as e:
logger.error(f"Error closing MarketData: {e}", exc_info=True)
# Close WalletManager
wallet_manager = components_dict.get("wallet_manager")
if wallet_manager and hasattr(wallet_manager, 'close') and callable(getattr(wallet_manager, 'close')):
try:
logger.info("Closing WalletManager...")
await wallet_manager.close()
logger.info("WalletManager closed.")
except Exception as e:
logger.error(f"Error closing WalletManager: {e}", exc_info=True)
# Close Hybrid Monitoring Manager
hybrid_monitoring = components_dict.get("hybrid_monitoring")
if hybrid_monitoring and hasattr(hybrid_monitoring, 'close') and callable(getattr(hybrid_monitoring, 'close')):
try:
logger.info("Closing HybridMonitoringManager...")
await hybrid_monitoring.close()
logger.info("HybridMonitoringManager closed.")
except Exception as e:
logger.error(f"Error closing HybridMonitoringManager: {e}", exc_info=True)
# Close Enhanced PumpSwap Parser Stream
helius_pump_parser = components_dict.get("helius_pump_parser")
if helius_pump_parser and hasattr(helius_pump_parser, 'stop_helius_pump_stream') and callable(getattr(helius_pump_parser, 'stop_helius_pump_stream')):
try:
logger.info("Stopping Helius Pump AMM stream...")
await helius_pump_parser.stop_helius_pump_stream()
logger.info("Helius Pump AMM stream stopped.")
except Exception as e:
logger.error(f"Error stopping Helius Pump AMM stream: {e}", exc_info=True)
# Close Blockchain Listener
blockchain_listener = components_dict.get("blockchain_listener")
if blockchain_listener and hasattr(blockchain_listener, 'close') and callable(getattr(blockchain_listener, 'close')):
try:
logger.info("Closing BlockchainListener...")
await blockchain_listener.close()
logger.info("BlockchainListener closed.")
except Exception as e:
logger.error(f"Error closing BlockchainListener: {e}", exc_info=True)
# Close Database
db = components_dict.get("db")
if db and hasattr(db, 'close') and callable(getattr(db, 'close')):
try:
logger.info("Closing TokenDatabase...")
await db.close()
logger.info("TokenDatabase closed.")
except Exception as e:
logger.error(f"Error closing TokenDatabase: {e}", exc_info=True)
# Close HTTP Client
http_client = components_dict.get("http_client")
if http_client and hasattr(http_client, 'aclose') and callable(getattr(http_client, 'aclose')):
try:
logger.info("Closing HTTPX Client...")
await http_client.aclose()
logger.info("HTTPX Client closed.")
except Exception as e:
logger.error(f"Error closing HTTPX Client: {e}", exc_info=True)
# Close Solana Client
solana_client = components_dict.get("solana_client")
if solana_client and hasattr(solana_client, 'close') and callable(getattr(solana_client, 'close')):
try:
logger.info("Closing Solana Client...")
await solana_client.close() # Ensure this is the correct close method
logger.info("Solana Client closed.")
except Exception as e:
logger.error(f"Error closing Solana Client: {e}", exc_info=True)
# Close SolanaTrackerAPI
solana_tracker_api = components_dict.get("solana_tracker_api")
if solana_tracker_api and hasattr(solana_tracker_api, 'close') and callable(getattr(solana_tracker_api, 'close')):
try:
logger.info("Closing SolanaTrackerAPI...")
await solana_tracker_api.close()
logger.info("SolanaTrackerAPI closed.")
except Exception as e:
logger.error(f"Error closing SolanaTrackerAPI: {e}", exc_info=True)
# Close PlatformTracker
platform_tracker = components_dict.get("platform_tracker")
if platform_tracker and hasattr(platform_tracker, 'close') and callable(getattr(platform_tracker, 'close')):
try:
logger.info("Closing PlatformTracker...")
await platform_tracker.close()
logger.info("PlatformTracker closed.")
except Exception as e:
logger.error(f"Error closing PlatformTracker: {e}", exc_info=True)
# Example for other components if they have close methods
# filter_manager = components_dict.get("filter_manager")
# if filter_manager and hasattr(filter_manager, 'close'):
# try:
# logger.info("Closing FilterManager...")
# await filter_manager.close()
# logger.info("FilterManager closed.")
# except Exception as e:
# logger.error(f"Error closing FilterManager: {e}", exc_info=True)
logger.info("--- All Components Closed ---")
async def signal_handler_async(shutdown_event: asyncio.Event,
logger_instance: logging.Logger):
"""Coroutine to handle signals by setting the shutdown event."""
if shutdown_event.is_set():
logger_instance.info("signal_handler_async: Shutdown already in progress.")
return
logger_instance.info("signal_handler_async: Signal received, setting shutdown_event. Main `finally` block will handle component cleanup and handler restoration.")
# Clean up PID file
pid_file = Path("supertradex.pid")
try:
if pid_file.exists():
pid_file.unlink()
logger_instance.info("PID file removed during shutdown")
except Exception as e:
logger_instance.warning(f"Could not remove PID file: {e}")
shutdown_event.set()
# --- Unified Asynchronous Component Initialization Function ---
async def initialize_components(settings: Settings, realtime_manager=None) -> dict:
"""Initializes and returns all core application components."""
logger.info("--- Initializing Core Components ---")
start_time = time.time()
# Initialize basic utilities
proxy_manager = ProxyManager(settings.PROXY_FILE_PATH) if settings.USE_PROXIES else None
if proxy_manager:
logger.info(f"ProxyManager initialized. {len(proxy_manager.get_all_proxies())} proxies loaded.")
# Setup HTTP client session (aiohttp or httpx based on preference)
# Using httpx as an example, compatible with proxy_manager
if proxy_manager:
# If using proxies, get a proxy URL for the client to use
proxy_url = proxy_manager.get_proxy_url()
if proxy_url:
transport = httpx.AsyncHTTPTransport(proxy_url=proxy_url)
http_client = httpx.AsyncClient(transport=transport, timeout=settings.HTTP_TIMEOUT)
logger.info(f"HTTPX AsyncClient initialized with proxy: {proxy_url}")
else:
http_client = httpx.AsyncClient(timeout=settings.HTTP_TIMEOUT)
logger.info("HTTPX AsyncClient initialized without proxy (no valid proxies available)")
else:
http_client = httpx.AsyncClient(timeout=settings.HTTP_TIMEOUT)
logger.info("HTTPX AsyncClient initialized without proxy manager")
logger.info(f"HTTPX AsyncClient initialized. Timeout: {settings.HTTP_TIMEOUT}s")
# Initialize Solana client
solana_client = AsyncClient(settings.SOLANA_RPC_URL)
logger.info(f"Solana AsyncClient initialized for endpoint: {settings.SOLANA_RPC_URL}")
# --- Database Initialization ---
db = await TokenDatabase.create(settings.DATABASE_FILE_PATH, settings)
if not db:
logger.critical("Failed to initialize TokenDatabase. Exiting.")
raise RuntimeError("Failed to initialize TokenDatabase.")
logger.info(f"TokenDatabase initialized with DB: {settings.DATABASE_FILE_PATH}")
# --- Configuration Objects ---
thresholds = Thresholds(settings=settings)
logger.info("Thresholds initialized.")
filters_config = FiltersConfig(settings=settings, thresholds=thresholds)
logger.info("FiltersConfig initialized.")
# --- Wallet Manager ---
wallet_manager = WalletManager(settings=settings, solana_client=solana_client, db=db)
await wallet_manager.initialize() # Initialize and load keypair
logger.info("WalletManager initialized.")
# Initialize shared HTTP and Solana clients
# ... (http_client, solana_client, proxy_manager initialization) ...
# Initialize main DexScreenerAPI client (SHARED INSTANCE)
logger.info("Initializing main DexScreenerAPI client...")
dexscreener_api = DexScreenerAPI(settings, proxy_manager=proxy_manager)
if not await dexscreener_api.initialize():
logger.warning("DexScreenerAPI initialization failed, but continuing system startup. Some features may be limited.")
# Don't exit - allow the system to continue without DexScreener API
# The API will retry connections when actually needed
else:
logger.info("Main DexScreenerAPI client initialized successfully.")
# Initialize MarketData, pass the SHARED dexscreener_api
logger.info("Initializing MarketData...")
market_data = MarketData(settings, dexscreener_api=dexscreener_api, token_db=db, http_client=http_client, solana_client=solana_client)
if not await market_data.initialize():
logger.critical("Failed to initialize MarketData. Exiting.")
# Perform necessary cleanup before exiting
await dexscreener_api.close() # Close the shared instance
if proxy_manager: await proxy_manager.close_session()
if http_client: await http_client.aclose()
if solana_client: await solana_client.close() # Changed to .close()
if db: await db.close()
return None
logger.info("MarketData initialized successfully.")
# Initialize other components like FilterManager, TokenMetrics, APIs...
# (Existing code for FilterManager, TokenMetrics, RugcheckAPI, SolsnifferAPI, TwitterCheck)
# ...
# Initialize main DexScreenerAPI client (SHARED INSTANCE)
# ... (dexscreener_api initialization) ...
# Initialize MarketData, pass the SHARED dexscreener_api
# ... (market_data initialization) ...
# --- Initialize components needed by FilterManager and TokenScanner ---
logger.info("Initializing API clients for FilterManager...")
rugcheck_api = RugcheckAPI(settings=settings, proxy_manager=proxy_manager)
logger.info("RugcheckAPI initialized.")
# SolsnifferAPI only accepts settings
solsniffer_api = SolsnifferAPI(settings=settings)
logger.info("SolsnifferAPI initialized.")
# TwitterCheck accepts settings and thresholds
twitter_check = TwitterCheck(settings=settings, thresholds=thresholds)
await twitter_check.initialize() # TwitterCheck has async initialize
logger.info("TwitterCheck initialized.")
logger.info("Initializing FilterManager...")
filter_manager = FilterManager(
settings=settings,
thresholds=thresholds,
filters_config=filters_config,
db=db,
http_client=http_client,
solana_client=solana_client,
price_monitor=market_data.price_monitor, # Get from MarketData
rugcheck_api=rugcheck_api,
solsniffer_api=solsniffer_api,
twitter_check=twitter_check
)
logger.info("FilterManager initialized successfully.")
logger.info("Initializing TokenMetrics...")
# TokenMetrics needs many components that we haven't initialized yet
# For now, let's skip TokenMetrics initialization and continue with other components
# token_metrics = TokenMetrics(...)
token_metrics = None # Placeholder - will be initialized later
logger.info("TokenMetrics initialization deferred (missing dependencies).")
# --- End FilterManager and TokenScanner dependencies ---
# TokenScanner initialization will be moved after TokenMetrics is properly initialized
token_scanner = None # Placeholder - will be initialized later
logger.info("TokenScanner initialization deferred until TokenMetrics is ready.")
# Initialize PlatformTracker (after TokenScanner if it depends on it, or in parallel if not)
logger.info("Initializing PlatformTracker...")
# PlatformTracker expects: db, settings, thresholds, solana_client
platform_tracker = PlatformTracker(db, settings, thresholds, solana_client)
if not await platform_tracker.initialize():
logger.critical("Failed to initialize PlatformTracker. Exiting.")
# ... (full cleanup) ...
return None
logger.info("PlatformTracker initialized successfully.")
# Skip DataPackage for now - it's causing TokenMetrics initialization issues
# All necessary components are already initialized above
logger.info("Skipping DataPackage initialization (not required for basic functionality).")
data_package = None
# Initialize trade queue
# --- Execution Components (needed by StrategyComponents) ---
balance_checker = BalanceChecker(
solana_client=solana_client,
wallet_pubkey=wallet_manager.get_public_key(),
http_client=http_client,
settings=settings
)
trade_validator = TradeValidator(balance_checker=balance_checker, settings=settings)
order_manager = OrderManager(
settings=settings,
solana_client=solana_client,
db=db,
wallet_manager=wallet_manager,
http_client=http_client,
trade_validator=trade_validator,
price_monitor=market_data.price_monitor # price_monitor comes from market_data
)
trade_queue = TradeQueue(order_manager=order_manager)
# Initialize TransactionTracker here as it can be needed by other strategy components initialized shortly
transaction_tracker = TransactionTracker(settings=settings, solana_client=solana_client, db=db)
logger.info("Core execution components (BalanceChecker, TradeValidator, OrderManager, TradeQueue, TransactionTracker) initialized.")
# --- Strategy Components ---
# Initialize AlertSystem first as it's a dependency for RiskMgmt and PositionMgmt
alert_system = AlertSystem() # AlertSystem initializes its own settings and does not take db/thresholds
logger.info("AlertSystem initialized.")
# Initialize Blacklist as it's needed by StrategySelector
blacklist = Blacklist(db=db) # Blacklist only takes db
logger.info("Blacklist initialized.")
# Initialize RiskManagement
risk_management = RiskManagement(
settings=settings,
thresholds=thresholds,
alert_system=alert_system,
db=db,
transaction_tracker=transaction_tracker,
order_manager=order_manager
)
logger.info("RiskManagement initialized.")
# Initialize PositionManagement
position_management = PositionManagement(
order_manager=order_manager,
settings=settings,
thresholds=thresholds,
balance_checker=balance_checker,
trade_validator=trade_validator
)
logger.info("PositionManagement initialized.")
# Initialize missing components needed by StrategySelector
# Initialize Indicators
indicators = Indicators(settings=settings, thresholds=thresholds)
logger.info("Indicators initialized.")
# Initialize Whitelist
whitelist = Whitelist(settings=settings)
logger.info("Whitelist initialized.")
# Initialize SolanaTrackerAPI
solana_tracker_api = SolanaTrackerAPI(settings=settings)
logger.info("SolanaTrackerAPI initialized.")
# Get price_monitor from MarketData
price_monitor = market_data.price_monitor
logger.info("PriceMonitor reference obtained from MarketData.")
# Get volume_monitor from MarketData or create one
volume_monitor = VolumeMonitor(db=db, settings=settings, thresholds=thresholds)
logger.info("VolumeMonitor initialized.")
# Use MarketData as monitoring service
monitoring_service = market_data
logger.info("Monitoring service set to MarketData.")
# Initialize EntryExitStrategy first as StrategySelector and StrategyEvaluator might need it
entry_exit_strategy = EntryExitStrategy(
settings=settings,
db=db,
trade_queue=None, # Operates in signal generation mode when used by StrategyEvaluator
market_data=market_data,
whitelist=whitelist,
blacklist=blacklist,
thresholds=thresholds,
wallet_manager=wallet_manager
# indicators=indicators # EES will calculate its own or use data from price events
)
await entry_exit_strategy.initialize(order_manager=order_manager) # Pass order_manager during initialization
logger.info("EntryExitStrategy initialized standalone.")
strategy_selector = StrategySelector(
settings=settings,
thresholds=thresholds,
filters_config=filters_config,
db=db,
market_data=market_data,
indicators=indicators,
price_monitor=price_monitor,
trade_queue=trade_queue,
entry_exit_strategy=entry_exit_strategy,
wallet_manager=wallet_manager,
order_manager=order_manager,
risk_management=risk_management,
position_management=position_management,
alert_system=alert_system,
whitelist=whitelist,
blacklist=blacklist
)
logger.info("StrategySelector initialized with more dependencies.")
token_metrics = TokenMetrics(
settings=settings,
db=db,
price_monitor=price_monitor,
thresholds=thresholds,
filter_manager=filter_manager,
whitelist=whitelist,
monitoring=monitoring_service, # Pass the MarketData instance as monitoring service
indicators=indicators,
platform_tracker=platform_tracker,
volume_monitor=volume_monitor,
strategy_selector=strategy_selector,
solana_client=solana_client
)
logger.info("TokenMetrics initialized.")
# Now initialize TokenScanner with proper TokenMetrics
logger.info("Initializing TokenScanner...")
token_scanner = TokenScanner(
db=db,
settings=settings,
thresholds=thresholds,
filter_manager=filter_manager,
market_data=market_data,
dexscreener_api=dexscreener_api,
token_metrics=token_metrics,
rugcheck_api=rugcheck_api # Optional, but pass if available
)
if not await token_scanner.initialize():
logger.critical("Failed to initialize TokenScanner. Exiting.")
# ... (cleanup including dexscreener_api, market_data, filter_manager, etc.)
await dexscreener_api.close()
await market_data.close() # MarketData close will handle its internal PriceMonitor
# filter_manager may have resources (like its own http_client if not shared) - check its close method
if hasattr(filter_manager, 'close') and asyncio.iscoroutinefunction(filter_manager.close):
await filter_manager.close()
if proxy_manager: await proxy_manager.close_session()
if http_client: await http_client.aclose()
if solana_client: await solana_client.close()
if db: await db.close()
return None
logger.info("TokenScanner initialized successfully.")
# DataFetcher - initialized as it was before, TokenScanner does not directly use it.
data_fetcher = DataFetcher(settings=settings)
logger.info("DataFetcher initialized.")
# Initialize TradeExecutor if available
trade_executor_instance = None
try:
from execution.trade_executor import TradeExecutor
trade_executor_instance = TradeExecutor(
settings=settings,
order_manager=order_manager,
transaction_tracker=transaction_tracker,
db=db,
wallet_manager=wallet_manager,
market_data=market_data
)
await trade_executor_instance.initialize()
logger.info("TradeExecutor initialized.")
except ImportError:
logger.warning("TradeExecutor class not found. Using None - StrategyEvaluator will work without trade execution.")
trade_executor_instance = None
except Exception as e:
logger.warning(f"TradeExecutor initialization failed: {e}. Using None.")
trade_executor_instance = None
# --- Strategy Components ---
strategy_evaluator = StrategyEvaluator(
market_data=market_data,
db=db,
settings=settings,
thresholds=thresholds,
trade_executor=trade_executor_instance,
wallet_manager=wallet_manager,
indicators=indicators,
trade_queue=trade_queue,
order_manager=order_manager,
entry_exit_strategy=entry_exit_strategy,
strategy_selector=strategy_selector
)
await strategy_evaluator.initialize_strategies() # Initialize internal strategies (will use provided EES or create if None)
logger.info("StrategyEvaluator initialized.")
# --- Paper Trading System ---
paper_trading = PaperTrading(
settings=settings,
db=db,
wallet_manager=wallet_manager,
price_monitor=price_monitor
)
await paper_trading.load_persistent_state()
logger.info("PaperTrading system initialized.")
# --- Focused Monitoring Manager ---
focused_monitoring = FocusedMonitoringManager(
settings=settings,
market_data=market_data,
db=db
)
logger.info("FocusedMonitoringManager initialized.")
# --- Blockchain Listener ---
blockchain_listener = BlockchainListener(settings=settings, callback=None)
await blockchain_listener.initialize()
logger.info("BlockchainListener initialized.")
# --- Hybrid Monitoring Manager ---
hybrid_monitoring = HybridMonitoringManager(
settings=settings,
blockchain_listener=blockchain_listener,
market_data=market_data,
token_db=db,
logger=logger
)
logger.info("HybridMonitoringManager initialized.")
# --- Enhanced PumpSwap Parser with Helius Stream ---
from data.pumpswap_parser import PumpSwapParser
from config.blockchain_logging import setup_blockchain_logger
async def helius_pump_callback(price_data):
"""Handle Helius Pump AMM price updates from enhanced parser"""
try:
# **DISABLED: The blockchain transaction data is NOT price data!**
# This was causing confusion by treating transaction amounts as prices