diff --git a/README.md b/README.md index e1be80f..9c15d81 100644 --- a/README.md +++ b/README.md @@ -236,6 +236,33 @@ nats consumer sub transfer transaction-consumer --- +## 🌐 HTTP API Examples + +```bash +# set this to your configured services.port +export INDEXER_PORT=8080 +``` + +### Health Check + +```bash +curl -s "http://localhost:${INDEXER_PORT}/health" | jq +``` + +### Reload TON Jetton Registry (all TON chains) + +```bash +curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload" | jq +``` + +### Reload TON Jetton Registry For One Chain + +```bash +curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload?chain=ton_mainnet" | jq +``` + +--- + ## 📝 Example `configs/config.yaml` (chains section) ```yaml diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 08e889e..830545c 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -3,11 +3,13 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -199,8 +201,10 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from redisClient, managerCfg, ) + tonWalletReloadService := worker.NewTonWalletReloadServiceFromManager(manager) + tonJettonReloadService := worker.NewTonJettonReloadServiceFromManager(manager) - healthServer := startHealthServer(cfg.Services.Port, cfg) + healthServer := startHealthServer(cfg.Services.Port, cfg, tonWalletReloadService, tonJettonReloadService) // Start all workers logger.Info("Starting all workers") @@ -232,7 +236,34 @@ type HealthResponse struct { Version string `json:"version"` } -func startHealthServer(port int, cfg *config.Config) *http.Server { +type APIErrorResponse struct { + Status string `json:"status"` + Error string `json:"error"` + Timestamp time.Time `json:"timestamp"` +} + +type TonWalletReloadResponse struct { + Status string `json:"status"` + Source worker.WalletReloadSource `json:"source"` + Chain string `json:"chain,omitempty"` + Results []worker.TonWalletReloadResult `json:"results"` + TriggeredAtUTC time.Time `json:"triggered_at_utc"` + SupportedSources []worker.WalletReloadSource `json:"supported_sources"` +} + +type TonJettonReloadResponse struct { + Status string `json:"status"` + Chain string `json:"chain,omitempty"` + Results []worker.TonJettonReloadResult `json:"results"` + TriggeredAtUTC time.Time `json:"triggered_at_utc"` +} + +func startHealthServer( + port int, + cfg *config.Config, + tonWalletReloadService *worker.TonWalletReloadService, + tonJettonReloadService *worker.TonJettonReloadService, +) *http.Server { mux := http.NewServeMux() version := cfg.Version @@ -246,10 +277,84 @@ func startHealthServer(port int, cfg *config.Config) *http.Server { Timestamp: time.Now().UTC(), Version: version, } + writeJSON(w, http.StatusOK, response) + }) + + mux.HandleFunc("/ton/wallets/reload", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + if tonWalletReloadService == nil { + writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error()) + return + } + + source := worker.WalletReloadSource(r.URL.Query().Get("source")).Normalize() + if !source.IsValid() { + writeErrorJSON(w, http.StatusBadRequest, "invalid source (supported: kv, db)") + return + } + + req := worker.TonWalletReloadRequest{ + Source: source, + ChainFilter: strings.TrimSpace(r.URL.Query().Get("chain")), + } + + results, err := tonWalletReloadService.ReloadTonWallets(r.Context(), req) + if err != nil { + statusCode := http.StatusInternalServerError + if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) { + statusCode = http.StatusNotFound + } + writeErrorJSON(w, statusCode, err.Error()) + return + } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(response) + response := TonWalletReloadResponse{ + Status: reloadWalletStatus(results), + Source: source, + Chain: req.ChainFilter, + Results: results, + TriggeredAtUTC: time.Now().UTC(), + SupportedSources: []worker.WalletReloadSource{worker.WalletReloadSourceKV, worker.WalletReloadSourceDB}, + } + writeJSON(w, http.StatusOK, response) + }) + + mux.HandleFunc("/ton/jettons/reload", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + if tonJettonReloadService == nil { + writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error()) + return + } + + req := worker.TonJettonReloadRequest{ + ChainFilter: r.URL.Query().Get("chain"), + } + + results, err := tonJettonReloadService.ReloadTonJettons(r.Context(), req) + if err != nil { + statusCode := http.StatusInternalServerError + if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) { + statusCode = http.StatusNotFound + } + writeErrorJSON(w, statusCode, err.Error()) + return + } + + response := TonJettonReloadResponse{ + Status: reloadJettonStatus(results), + Chain: req.ChainFilter, + Results: results, + TriggeredAtUTC: time.Now().UTC(), + } + writeJSON(w, http.StatusOK, response) }) server := &http.Server{ @@ -258,7 +363,13 @@ func startHealthServer(port int, cfg *config.Config) *http.Server { } go func() { - logger.Info("Health check server started", "port", port, "endpoint", "/health") + logger.Info( + "Health check server started", + "port", port, + "health_endpoint", "/health", + "wallet_reload_endpoint", "/ton/wallets/reload", + "jetton_reload_endpoint", "/ton/jettons/reload", + ) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("Health server failed to start", "error", err) } @@ -267,6 +378,40 @@ func startHealthServer(port int, cfg *config.Config) *http.Server { return server } +func reloadWalletStatus(results []worker.TonWalletReloadResult) string { + for _, result := range results { + if result.Error != "" { + return "partial_error" + } + } + return "ok" +} + +func reloadJettonStatus(results []worker.TonJettonReloadResult) string { + for _, result := range results { + if result.Error != "" { + return "partial_error" + } + } + return "ok" +} + +func writeJSON(w http.ResponseWriter, statusCode int, payload any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(payload); err != nil { + logger.Error("Failed to encode response", "status", statusCode, "err", err) + } +} + +func writeErrorJSON(w http.ResponseWriter, statusCode int, message string) { + writeJSON(w, statusCode, APIErrorResponse{ + Status: "error", + Error: message, + Timestamp: time.Now().UTC(), + }) +} + func waitForShutdown() { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 583075f..74dc7b5 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -161,6 +161,41 @@ chains: - url: "fullnode.mainnet.sui.io:443" # e.g. 127.0.0.1:9000 - url: "sui-mainnet.nodeinfra.com:443" + ton_mainnet: + network_id: "ton" + internal_code: "TON_MAINNET" + type: "ton" + poll_interval: "5s" + nodes: + - url: "https://ton.org/global.config.json" + throttle: + concurrency: 10 + batch_size: 50 + jettons: + - master_address: "EQCxE6mUtQJKFnGfaROTgt1lZbDiiX1Gw83iAV82Cn_0opUb" # USDT + symbol: "USDT" + decimals: 6 + + ton_testnet: + network_id: "ton_testnet" + internal_code: "TON_TESTNET" + type: "ton" + start_block: 0 + poll_interval: "4s" + nodes: + - url: "https://ton.org/testnet-global-config.json" + client: + timeout: "15s" + throttle: + rps: 10 + burst: 20 + batch_size: 1 + concurrency: 4 + jettons: + - master_address: "0:1de7d5cc59be344d17f1a93274a2dad44ea0fc66ec4b1e10bc4f8d35039f8aaf" + symbol: "USDT" + decimals: 6 + # Infrastructure services services: port: 8080 # Health check and monitoring server port diff --git a/go.mod b/go.mod index fef59e3..5c59bea 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/stretchr/testify v1.11.1 github.com/tyler-smith/go-bip39 v1.1.0 + github.com/xssnick/tonutils-go v1.15.5 golang.org/x/crypto v0.44.0 golang.org/x/sync v0.18.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda @@ -34,6 +35,7 @@ require ( replace github.com/imdario/mergo => github.com/imdario/mergo v0.3.16 require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/bits-and-blooms/bitset v1.10.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 73dba2b..ab89f67 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/assert/v2 v2.11.0 h1:2Q9r3ki8+JYXvGsDyBXwH3LcJ+WK5D0gc5E8vS6K3D0= @@ -281,6 +283,8 @@ github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U= +github.com/xssnick/tonutils-go v1.15.5 h1:yAcHnDaY5QW0aIQE47lT0PuDhhHYE+N+NyZssdPKR0s= +github.com/xssnick/tonutils-go v1.15.5/go.mod h1:3/B8mS5IWLTd1xbGbFbzRem55oz/Q86HG884bVsTqZ8= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= diff --git a/internal/indexer/sui.go b/internal/indexer/sui.go index 87ec141..b53eb26 100644 --- a/internal/indexer/sui.go +++ b/internal/indexer/sui.go @@ -220,7 +220,7 @@ func (s *SuiIndexer) convertCheckpoint(cp *sui.Checkpoint) *types.Block { func (s *SuiIndexer) convertTransaction(execTx *v2.ExecutedTransaction, blockNumber, blockTs uint64) types.Transaction { t := types.Transaction{ TxHash: execTx.GetDigest(), - NetworkId: s.cfg.InternalCode, + NetworkId: s.cfg.NetworkId, BlockNumber: blockNumber, Timestamp: blockTs, } diff --git a/internal/indexer/ton/address.go b/internal/indexer/ton/address.go new file mode 100644 index 0000000..a263f0a --- /dev/null +++ b/internal/indexer/ton/address.go @@ -0,0 +1,83 @@ +package ton + +import ( + "fmt" + "strings" + + "github.com/xssnick/tonutils-go/address" +) + +func resolvePollAddress(addrStr string) (*address.Address, string, error) { + addr, err := parseTONAddress(addrStr) + if err != nil { + return nil, "", fmt.Errorf("invalid TON address %s: %w", addrStr, err) + } + return addr, addr.StringRaw(), nil +} + +// NormalizeTONAddressRaw returns canonical raw format (workchain:hex) or empty if invalid. +func NormalizeTONAddressRaw(addr string) string { + parsed, err := parseTONAddress(addr) + if err != nil { + return "" + } + return parsed.StringRaw() +} + +// NormalizeTONAddressList canonicalizes to raw format, trims invalid inputs, and de-duplicates while preserving order. +func NormalizeTONAddressList(addresses []string) []string { + if len(addresses) == 0 { + return nil + } + + dedup := make(map[string]struct{}, len(addresses)) + result := make([]string, 0, len(addresses)) + for _, addr := range addresses { + normalized := NormalizeTONAddressRaw(addr) + if normalized == "" { + continue + } + if _, exists := dedup[normalized]; exists { + continue + } + dedup[normalized] = struct{}{} + result = append(result, normalized) + } + + return result +} + +func parseTONAddress(addrStr string) (*address.Address, error) { + addrStr = strings.TrimSpace(addrStr) + + // User-friendly format (base64url with checksum), e.g. EQ... + if addr, err := address.ParseAddr(addrStr); err == nil { + return addr, nil + } + // Raw format, e.g. 0:abcdef... + if addr, err := address.ParseRawAddr(addrStr); err == nil { + return addr, nil + } + + // Defensive normalization for malformed historical values. + parts := strings.SplitN(addrStr, ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid address format") + } + + rawHex := strings.TrimPrefix(strings.ToLower(strings.TrimSpace(parts[1])), "0x") + if rawHex == "" { + return nil, fmt.Errorf("empty address payload") + } + + if len(rawHex)%2 == 1 { + rawHex = "0" + rawHex + } + if len(rawHex) > 64 { + rawHex = rawHex[len(rawHex)-64:] + } else if len(rawHex) < 64 { + rawHex = strings.Repeat("0", 64-len(rawHex)) + rawHex + } + + return address.ParseRawAddr(parts[0] + ":" + rawHex) +} diff --git a/internal/indexer/ton/core.go b/internal/indexer/ton/core.go new file mode 100644 index 0000000..de5abac --- /dev/null +++ b/internal/indexer/ton/core.go @@ -0,0 +1,324 @@ +package ton + +import ( + "context" + "encoding/hex" + "fmt" + "sync" + + tonRpc "github.com/fystack/multichain-indexer/internal/rpc/ton" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/fystack/multichain-indexer/pkg/infra" + tonCursorStore "github.com/fystack/multichain-indexer/pkg/store/toncursorstore" + "github.com/xssnick/tonutils-go/tlb" +) + +type AccountCursor = tonCursorStore.AccountCursor +type CursorStore = tonCursorStore.Store + +func NewCursorStore(kv infra.KVStore) CursorStore { + return tonCursorStore.New(kv) +} + +// AccountIndexer is the interface for account-based indexing (TON). +// This is separate from the block-based Indexer interface used by EVM/Solana/etc. +type AccountIndexer interface { + GetName() string + GetNetworkType() enum.NetworkType + GetNetworkInternalCode() string + + // PollAccount fetches new transactions for a single account. + // Returns parsed transactions and the new cursor position. + // If no new transactions, returns empty slice and same cursor. + PollAccount(ctx context.Context, address string, cursor *AccountCursor) ([]types.Transaction, *AccountCursor, error) + + // IsHealthy checks if the RPC connection is healthy. + IsHealthy() bool + + // ReloadJettons refreshes supported jetton metadata at runtime. + ReloadJettons(ctx context.Context) (int, error) +} + +// TonAccountIndexer implements AccountIndexer for TON blockchain. +type TonAccountIndexer struct { + chainName string + config config.ChainConfig + client tonRpc.TonAPI + jettonRegistry JettonRegistry + jettonMasterMu sync.RWMutex + jettonMasters map[string]string // jetton wallet -> jetton master + + // Transaction limit per poll + txLimit uint32 +} + +// NewTonAccountIndexer creates a new TON account indexer. +func NewTonAccountIndexer( + chainName string, + cfg config.ChainConfig, + client tonRpc.TonAPI, + jettonRegistry JettonRegistry, +) *TonAccountIndexer { + txLimit := uint32(50) // Default transaction limit + if cfg.Throttle.BatchSize > 0 { + txLimit = uint32(cfg.Throttle.BatchSize) + } + + return &TonAccountIndexer{ + chainName: chainName, + config: cfg, + client: client, + jettonRegistry: jettonRegistry, + jettonMasters: make(map[string]string), + txLimit: txLimit, + } +} + +func (i *TonAccountIndexer) GetName() string { return i.chainName } +func (i *TonAccountIndexer) GetNetworkType() enum.NetworkType { return enum.NetworkTypeTon } +func (i *TonAccountIndexer) GetNetworkInternalCode() string { return i.config.InternalCode } + +// IsHealthy checks if the RPC connection is healthy. +func (i *TonAccountIndexer) IsHealthy() bool { + // The client manages its own connection pool and recovery. + // We consider it healthy if it's initialized. + return i.client != nil +} + +func (i *TonAccountIndexer) ReloadJettons(ctx context.Context) (int, error) { + if i.jettonRegistry == nil { + return 0, nil + } + + type registryReloader interface { + Reload(context.Context) error + } + + if reloader, ok := i.jettonRegistry.(registryReloader); ok { + if err := reloader.Reload(ctx); err != nil { + return 0, fmt.Errorf("reload jetton registry: %w", err) + } + } + + return len(i.jettonRegistry.List()), nil +} + +// PollAccount fetches new transactions for a single account. +func (i *TonAccountIndexer) PollAccount(ctx context.Context, addrStr string, cursor *AccountCursor) ([]types.Transaction, *AccountCursor, error) { + if i.client == nil { + return nil, cursor, fmt.Errorf("ton rpc client is not initialized") + } + + addr, normalizedAddress, err := resolvePollAddress(addrStr) + if err != nil { + return nil, cursor, err + } + + lastLT, lastHash, err := decodeCursorForRPC(cursor) + if err != nil { + return nil, cursor, err + } + + txs, err := i.client.ListTransactions(ctx, addr, i.txLimit, lastLT, lastHash) + if err != nil { + return nil, cursor, fmt.Errorf("failed to list transactions: %w", err) + } + + // No new transactions + if len(txs) == 0 { + return nil, cursor, nil + } + + parsedTxs := make([]types.Transaction, 0, len(txs)) + newCursor := ensureCursor(cursor, addrStr) + var latestSeqno uint64 + var latestSeqnoFetched bool + + // TON API returns newest first, so process backwards (oldest to newest). + for j := len(txs) - 1; j >= 0; j-- { + tx := txs[j] + + if isAlreadyProcessed(cursor, tx) { + continue + } + + advanceCursor(newCursor, tx) + + if !isTransactionSuccess(tx) { + continue + } + + matchedTxs := i.parseMatchedTransactions(tx, normalizedAddress) + if len(matchedTxs) == 0 { + continue + } + i.resolveJettonAssetAddresses(ctx, matchedTxs) + + if !latestSeqnoFetched { + latestSeqno, err = i.client.GetLatestMasterchainSeqno(ctx) + if err != nil { + return nil, cursor, fmt.Errorf("failed to get latest masterchain seqno: %w", err) + } + latestSeqnoFetched = true + } + + for idx := range matchedTxs { + matchedTxs[idx].BlockNumber = latestSeqno + matchedTxs[idx].MasterchainSeqno = latestSeqno + } + + parsedTxs = append(parsedTxs, matchedTxs...) + } + + return parsedTxs, newCursor, nil +} + +func (i *TonAccountIndexer) resolveJettonAssetAddresses(ctx context.Context, txs []types.Transaction) { + for idx := range txs { + tx := &txs[idx] + if tx.Type != constant.TxTypeTokenTransfer || tx.AssetAddress == "" { + continue + } + + walletAddress := tx.AssetAddress + if i.jettonRegistry != nil { + if info, ok := i.jettonRegistry.GetInfo(walletAddress); ok { + tx.AssetAddress = info.MasterAddress + continue + } + if info, ok := i.jettonRegistry.GetInfoByWallet(walletAddress); ok { + tx.AssetAddress = info.MasterAddress + i.cacheJettonMaster(walletAddress, info.MasterAddress) + continue + } + } + + if cachedMaster, ok := i.getCachedJettonMaster(walletAddress); ok { + tx.AssetAddress = cachedMaster + continue + } + + resolvedMaster, err := i.client.ResolveJettonMasterAddress(ctx, walletAddress) + if err != nil || resolvedMaster == "" { + continue + } + + i.cacheJettonMaster(walletAddress, resolvedMaster) + tx.AssetAddress = resolvedMaster + if i.jettonRegistry != nil { + i.jettonRegistry.RegisterWallet(walletAddress, resolvedMaster) + } + } +} + +func (i *TonAccountIndexer) getCachedJettonMaster(walletAddress string) (string, bool) { + i.jettonMasterMu.RLock() + defer i.jettonMasterMu.RUnlock() + master, ok := i.jettonMasters[walletAddress] + return master, ok +} + +func (i *TonAccountIndexer) cacheJettonMaster(walletAddress, masterAddress string) { + i.jettonMasterMu.Lock() + defer i.jettonMasterMu.Unlock() + i.jettonMasters[walletAddress] = masterAddress +} + +func decodeCursorForRPC(cursor *AccountCursor) (uint64, []byte, error) { + if cursor == nil || cursor.LastLT == 0 { + return 0, nil, nil + } + + lastLT := cursor.LastLT + if cursor.LastHash == "" { + return lastLT, nil, nil + } + + lastHash, err := hex.DecodeString(cursor.LastHash) + if err != nil { + return 0, nil, fmt.Errorf("invalid cursor hash: %w", err) + } + + return lastLT, lastHash, nil +} + +func ensureCursor(cursor *AccountCursor, address string) *AccountCursor { + if cursor != nil { + return cursor + } + return &AccountCursor{Address: address} +} + +func isAlreadyProcessed(cursor *AccountCursor, tx *tlb.Transaction) bool { + return cursor != nil && tx.LT <= cursor.LastLT +} + +func advanceCursor(cursor *AccountCursor, tx *tlb.Transaction) { + cursor.LastLT = tx.LT + cursor.LastHash = hex.EncodeToString(tx.Hash) +} + +// isTransactionSuccess checks if the transaction was successful by examining its phases. +func isTransactionSuccess(tx *tlb.Transaction) bool { + if tx.Description == nil { + return true // Should not happen with valid transactions + } + + desc, ok := tx.Description.(*tlb.TransactionDescriptionOrdinary) + if !ok { + return true + } + + // Check if the whole transaction was aborted + if desc.Aborted { + return false + } + + // Check Compute Phase + if desc.ComputePhase.Phase != nil { + if cp, ok := desc.ComputePhase.Phase.(*tlb.ComputePhaseVM); ok { + if !cp.Success { + return false + } + } + } + + // Check Action Phase + if desc.ActionPhase != nil { + if !desc.ActionPhase.Success { + return false + } + } + + return true +} + +// JettonInfo describes a supported Jetton token. +type JettonInfo struct { + MasterAddress string `json:"master_address" yaml:"master_address"` // Jetton master contract + Symbol string `json:"symbol" yaml:"symbol"` + Decimals int `json:"decimals" yaml:"decimals"` +} + +// JettonRegistry manages supported Jetton tokens. +type JettonRegistry interface { + // IsSupported checks if a Jetton wallet belongs to a supported Jetton. + // This may require looking up the wallet's master address. + IsSupported(walletAddress string) bool + + // GetInfo returns info for a Jetton by its master address. + GetInfo(masterAddress string) (*JettonInfo, bool) + + // GetInfoByWallet returns info for a Jetton by a wallet address. + // Returns nil if the wallet is not from a known Jetton. + GetInfoByWallet(walletAddress string) (*JettonInfo, bool) + + // RegisterWallet associates a Jetton wallet with its master address. + RegisterWallet(walletAddress, masterAddress string) + + // List returns all supported Jettons. + List() []JettonInfo +} diff --git a/internal/indexer/ton/core_jetton_test.go b/internal/indexer/ton/core_jetton_test.go new file mode 100644 index 0000000..92735fa --- /dev/null +++ b/internal/indexer/ton/core_jetton_test.go @@ -0,0 +1,75 @@ +package ton + +import ( + "context" + "testing" + + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/stretchr/testify/assert" + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/tlb" +) + +type mockTonAPI struct { + resolveMasterFn func(ctx context.Context, jettonWallet string) (string, error) + resolveCalls int +} + +func (m *mockTonAPI) ListTransactions(_ context.Context, _ *address.Address, _ uint32, _ uint64, _ []byte) ([]*tlb.Transaction, error) { + return nil, nil +} + +func (m *mockTonAPI) GetLatestMasterchainSeqno(_ context.Context) (uint64, error) { + return 0, nil +} + +func (m *mockTonAPI) ResolveJettonMasterAddress(ctx context.Context, jettonWallet string) (string, error) { + m.resolveCalls++ + if m.resolveMasterFn == nil { + return "", nil + } + return m.resolveMasterFn(ctx, jettonWallet) +} + +func TestResolveJettonAssetAddresses_ResolveAndCache(t *testing.T) { + const ( + walletAddr = "0:eaa27e0e4fbadad817ac4a106de2bae8b52106b5267c1656a3892538d59c69dc" + masterAddr = "0:ca6e321c3ce184f66f4f74344770f31472800583947a7f9d5f68fecf052ce20f" + ) + + api := &mockTonAPI{ + resolveMasterFn: func(_ context.Context, jettonWallet string) (string, error) { + if jettonWallet == walletAddr { + return masterAddr, nil + } + return "", nil + }, + } + + idx := &TonAccountIndexer{ + client: api, + jettonRegistry: NewConfigBasedRegistry(nil), + jettonMasters: make(map[string]string), + } + + first := []types.Transaction{ + { + Type: constant.TxTypeTokenTransfer, + AssetAddress: walletAddr, + }, + } + idx.resolveJettonAssetAddresses(context.Background(), first) + assert.Equal(t, masterAddr, first[0].AssetAddress) + assert.Equal(t, 1, api.resolveCalls) + + second := []types.Transaction{ + { + Type: constant.TxTypeTokenTransfer, + AssetAddress: walletAddr, + }, + } + idx.resolveJettonAssetAddresses(context.Background(), second) + assert.Equal(t, masterAddr, second[0].AssetAddress) + assert.Equal(t, 1, api.resolveCalls) +} diff --git a/internal/indexer/ton/core_test.go b/internal/indexer/ton/core_test.go new file mode 100644 index 0000000..874c3bf --- /dev/null +++ b/internal/indexer/ton/core_test.go @@ -0,0 +1,62 @@ +package ton + +import ( + "encoding/hex" + "testing" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTonAccountIndexerTransactionNetworkID(t *testing.T) { + t.Run("prefer_network_id", func(t *testing.T) { + indexer := &TonAccountIndexer{ + config: config.ChainConfig{ + NetworkId: "ton_testnet", + InternalCode: "TON_TESTNET", + }, + } + + assert.Equal(t, "ton_testnet", indexer.transactionNetworkID()) + }) + + t.Run("empty_when_network_id_missing", func(t *testing.T) { + indexer := &TonAccountIndexer{ + config: config.ChainConfig{ + NetworkId: " ", + InternalCode: "TON_TESTNET", + }, + } + + assert.Equal(t, "", indexer.transactionNetworkID()) + }) +} + +func TestNormalizeTONAddressList_CanonicalizesAndDedups(t *testing.T) { + addr1 := "0:fc58a2bb35b051810bef84fce18747ac2c2cfcbe0ce3d3167193d9b2538ef33e" + addr2 := "0:2942e40f94b5a2f111ea2ff98beb5f634f3a971f99f7fedafff5164c4bfa1bef" + + got := NormalizeTONAddressList([]string{ + " 0:FC58A2BB35B051810BEF84FCE18747AC2C2CFCBE0CE3D3167193D9B2538EF33E ", + "0:0xfc58a2bb35b051810bef84fce18747ac2c2cfcbe0ce3d3167193d9b2538ef33e", + addr2, + "not-an-address", + addr1, + "", + }) + + assert.Equal(t, []string{addr1, addr2}, got) +} + +func TestEncodeTONTxHash_UsesHex(t *testing.T) { + hash := []byte{0xfb, 0xef, 0xff, 0xfa, 0x00, 0x01, 0x02, 0x7f, 0x80, 0x90, 0xaa} + + encoded := encodeTONTxHash(hash) + + assert.Equal(t, "fbeffffa0001027f8090aa", encoded) + + decoded, err := hex.DecodeString(encoded) + require.NoError(t, err) + assert.Equal(t, hash, decoded) +} diff --git a/internal/indexer/ton/parser.go b/internal/indexer/ton/parser.go new file mode 100644 index 0000000..4649494 --- /dev/null +++ b/internal/indexer/ton/parser.go @@ -0,0 +1,320 @@ +package ton + +import ( + "encoding/hex" + "strings" + + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/shopspring/decimal" + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/tlb" +) + +const ( + // Jetton standard opcodes (TEP-74) + OpcodeTransfer uint64 = 0x0f8a7ea5 + OpcodeTransferNotification uint64 = 0x7362d09c +) + +func (i *TonAccountIndexer) transactionNetworkID() string { + return strings.TrimSpace(i.config.NetworkId) +} + +func (i *TonAccountIndexer) parseMatchedTransactions( + tx *tlb.Transaction, + normalizedAddress string, +) []types.Transaction { + networkID := i.transactionNetworkID() + collected := parseTonTransferNormalized(tx, normalizedAddress, networkID) + if i.jettonRegistry != nil { + collected = append(collected, parseJettonTransferNormalized(tx, normalizedAddress, networkID, i.jettonRegistry)...) + } + return collected +} + +// ParseTonTransfer extracts native TON transfers from a transaction. +// Detects both incoming (receive) and outgoing (send) transfers. +func ParseTonTransfer(tx *tlb.Transaction, ourAddress string, networkID string) []types.Transaction { + normalizedOurAddress := NormalizeTONAddressRaw(ourAddress) + if normalizedOurAddress == "" { + return nil + } + return parseTonTransferNormalized(tx, normalizedOurAddress, networkID) +} + +func parseTonTransferNormalized(tx *tlb.Transaction, normalizedOurAddress string, networkID string) []types.Transaction { + var results []types.Transaction + + if parsed, ok := parseIncomingTransfer(tx, normalizedOurAddress, networkID); ok { + results = append(results, *parsed) + } + + for _, intMsg := range outgoingInternalMessages(tx) { + parsed, ok := parseOutgoingTransferMessage(tx, intMsg, normalizedOurAddress, networkID) + if !ok { + continue + } + results = append(results, parsed) + } + + return results +} + +func parseIncomingTransfer(tx *tlb.Transaction, ourAddress string, networkID string) (*types.Transaction, bool) { + intMsg, ok := incomingInternalMessage(tx) + if !ok { + return nil, false + } + + dstAddrRaw := intMsg.DstAddr.StringRaw() + if dstAddrRaw != ourAddress || !isSimpleTransferMessage(intMsg) { + return nil, false + } + + amount := intMsg.Amount.Nano().String() + if amount == "0" { + return nil, false + } + + txData := newBaseParsedTransaction(tx, networkID) + txData.FromAddress = intMsg.SrcAddr.StringRaw() + txData.ToAddress = dstAddrRaw + txData.AssetAddress = "" + txData.Amount = amount + txData.Type = constant.TxTypeNativeTransfer + return &txData, true +} + +func parseOutgoingTransferMessage( + tx *tlb.Transaction, + intMsg *tlb.InternalMessage, + ourAddress string, + networkID string, +) (types.Transaction, bool) { + srcAddr := intMsg.SrcAddr.StringRaw() + if srcAddr != ourAddress || !isSimpleTransferMessage(intMsg) { + return types.Transaction{}, false + } + + amount := intMsg.Amount.Nano().String() + if amount == "0" { + return types.Transaction{}, false + } + + txData := newBaseParsedTransaction(tx, networkID) + txData.FromAddress = srcAddr + txData.ToAddress = intMsg.DstAddr.StringRaw() + txData.AssetAddress = "" + txData.Amount = amount + txData.Type = constant.TxTypeNativeTransfer + return txData, true +} + +// ParseJettonTransfer extracts Jetton transfers from a transaction. +func ParseJettonTransfer(tx *tlb.Transaction, ourAddress string, networkID string, registry JettonRegistry) []types.Transaction { + normalizedOurAddress := NormalizeTONAddressRaw(ourAddress) + if normalizedOurAddress == "" { + return nil + } + return parseJettonTransferNormalized(tx, normalizedOurAddress, networkID, registry) +} + +func parseJettonTransferNormalized( + tx *tlb.Transaction, + normalizedOurAddress string, + networkID string, + registry JettonRegistry, +) []types.Transaction { + var results []types.Transaction + + if parsed, ok := parseIncomingJetton(tx, normalizedOurAddress, networkID, registry); ok { + results = append(results, *parsed) + } + + for _, intMsg := range outgoingInternalMessages(tx) { + parsed, ok := parseOutgoingJettonMessage(tx, intMsg, normalizedOurAddress, networkID, registry) + if !ok { + continue + } + results = append(results, parsed) + } + + return results +} + +func parseIncomingJetton(tx *tlb.Transaction, ourAddress string, networkID string, registry JettonRegistry) (*types.Transaction, bool) { + intMsg, ok := incomingInternalMessage(tx) + if !ok { + return nil, false + } + + dstAddrRaw := intMsg.DstAddr.StringRaw() + if dstAddrRaw != ourAddress || !messageHasOpcode(intMsg, OpcodeTransferNotification) { + return nil, false + } + + jettonAmount, sender, ok := parseJettonTransferBody(intMsg) + if !ok || sender == nil { + return nil, false + } + + jettonWallet := intMsg.SrcAddr.StringRaw() + + txData := newBaseParsedTransaction(tx, networkID) + txData.FromAddress = sender.StringRaw() + txData.ToAddress = dstAddrRaw + txData.AssetAddress = resolveJettonAssetAddress(registry, jettonWallet) + txData.Amount = jettonAmount + txData.Type = constant.TxTypeTokenTransfer + return &txData, true +} + +func parseOutgoingJettonMessage( + tx *tlb.Transaction, + intMsg *tlb.InternalMessage, + ourAddress string, + networkID string, + registry JettonRegistry, +) (types.Transaction, bool) { + if !messageHasOpcode(intMsg, OpcodeTransfer) { + return types.Transaction{}, false + } + + jettonAmount, destination, ok := parseJettonTransferBody(intMsg) + if !ok || destination == nil { + return types.Transaction{}, false + } + + jettonWallet := intMsg.DstAddr.StringRaw() + + txData := newBaseParsedTransaction(tx, networkID) + txData.FromAddress = ourAddress + txData.ToAddress = destination.StringRaw() + txData.AssetAddress = resolveJettonAssetAddress(registry, jettonWallet) + txData.Amount = jettonAmount + txData.Type = constant.TxTypeTokenTransfer + return txData, true +} + +func parseJettonTransferBody(msg *tlb.InternalMessage) (string, *address.Address, bool) { + if msg.Body == nil { + return "", nil, false + } + + bodySlice := msg.Body.BeginParse() + if _, err := bodySlice.LoadUInt(32); err != nil { // opcode + return "", nil, false + } + if _, err := bodySlice.LoadUInt(64); err != nil { // query_id + return "", nil, false + } + + jettonAmount, err := bodySlice.LoadVarUInt(16) + if err != nil { + return "", nil, false + } + + peerAddress, err := bodySlice.LoadAddr() + if err != nil { + return "", nil, false + } + + return jettonAmount.String(), peerAddress, true +} + +func incomingInternalMessage(tx *tlb.Transaction) (*tlb.InternalMessage, bool) { + if tx.IO.In == nil { + return nil, false + } + + intMsg, ok := tx.IO.In.Msg.(*tlb.InternalMessage) + if !ok || intMsg.Bounced { + return nil, false + } + + return intMsg, true +} + +func outgoingInternalMessages(tx *tlb.Transaction) []*tlb.InternalMessage { + if tx.IO.Out == nil { + return nil + } + + outList, err := tx.IO.Out.ToSlice() + if err != nil { + return nil + } + + messages := make([]*tlb.InternalMessage, 0, len(outList)) + for _, outMsg := range outList { + intMsg, ok := outMsg.Msg.(*tlb.InternalMessage) + if !ok { + continue + } + messages = append(messages, intMsg) + } + return messages +} + +func messageOpcode(msg *tlb.InternalMessage) (uint64, bool) { + if msg.Body == nil { + return 0, false + } + + bodySlice := msg.Body.BeginParse() + if bodySlice.BitsLeft() < 32 { + return 0, false + } + + opcode, err := bodySlice.LoadUInt(32) + if err != nil { + return 0, false + } + return opcode, true +} + +func isSimpleTransferMessage(msg *tlb.InternalMessage) bool { + if msg.Body == nil { + return true + } + + opcode, ok := messageOpcode(msg) + if !ok { + return true + } + + // Opcode 0 means comment payload for regular TON transfer. + return opcode == 0 +} + +func messageHasOpcode(msg *tlb.InternalMessage, expected uint64) bool { + opcode, ok := messageOpcode(msg) + return ok && opcode == expected +} + +func resolveJettonAssetAddress(registry JettonRegistry, jettonWallet string) string { + if registry == nil { + return jettonWallet + } + if info, ok := registry.GetInfoByWallet(jettonWallet); ok { + return info.MasterAddress + } + return jettonWallet +} + +func encodeTONTxHash(hash []byte) string { + return hex.EncodeToString(hash) +} + +func newBaseParsedTransaction(tx *tlb.Transaction, networkID string) types.Transaction { + return types.Transaction{ + TxHash: encodeTONTxHash(tx.Hash), + NetworkId: networkID, + BlockNumber: 0, + LogicalTime: tx.LT, + TxFee: decimal.NewFromBigInt(tx.TotalFees.Coins.Nano(), 0).Div(decimal.NewFromInt(1e9)), + Timestamp: uint64(tx.Now), + Status: types.StatusConfirmed, + } +} diff --git a/internal/indexer/ton/registry.go b/internal/indexer/ton/registry.go new file mode 100644 index 0000000..cecdc77 --- /dev/null +++ b/internal/indexer/ton/registry.go @@ -0,0 +1,268 @@ +package ton + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + + "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/redis/go-redis/v9" +) + +const ( + jettonMasterListKeyFormat = "ton/jettons/%s/masters" + jettonWalletMappingKeyFormat = "ton/jettons/%s/wallet_to_master" +) + +// ConfigBasedRegistry implements JettonRegistry with a static list of Jettons. +// Wallet-to-master mappings are cached as they're discovered. +type ConfigBasedRegistry struct { + jettons map[string]JettonInfo // key: master address + walletToMaster map[string]string // key: wallet address -> master address + mu sync.RWMutex +} + +// NewConfigBasedRegistry creates a registry from a list of supported Jettons. +func NewConfigBasedRegistry(jettons []JettonInfo) *ConfigBasedRegistry { + m := make(map[string]JettonInfo) + for _, j := range jettons { + m[j.MasterAddress] = j + } + return &ConfigBasedRegistry{ + jettons: m, + walletToMaster: make(map[string]string), + } +} + +// IsSupported checks if a Jetton wallet belongs to a supported Jetton. +func (r *ConfigBasedRegistry) IsSupported(walletAddress string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + _, ok := r.walletToMaster[walletAddress] + return ok +} + +// GetInfo returns info for a Jetton by its master address. +func (r *ConfigBasedRegistry) GetInfo(masterAddress string) (*JettonInfo, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + info, ok := r.jettons[masterAddress] + if !ok { + return nil, false + } + return &info, true +} + +// GetInfoByWallet returns info for a Jetton by a wallet address. +func (r *ConfigBasedRegistry) GetInfoByWallet(walletAddress string) (*JettonInfo, bool) { + r.mu.RLock() + masterAddr, ok := r.walletToMaster[walletAddress] + r.mu.RUnlock() + + if !ok { + return nil, false + } + + if info, ok := r.GetInfo(masterAddr); ok { + return info, true + } + // Fallback: return master address even when token metadata is not preconfigured. + return &JettonInfo{MasterAddress: masterAddr}, true +} + +// RegisterWallet associates a Jetton wallet with its master address. +// This is typically called when processing a Jetton transfer for the first time. +func (r *ConfigBasedRegistry) RegisterWallet(walletAddress, masterAddress string) { + r.mu.Lock() + defer r.mu.Unlock() + + r.walletToMaster[walletAddress] = masterAddress +} + +// List returns all supported Jettons. +func (r *ConfigBasedRegistry) List() []JettonInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]JettonInfo, 0, len(r.jettons)) + for _, j := range r.jettons { + result = append(result, j) + } + return result +} + +// RedisJettonRegistry loads supported jettons from Redis and keeps an in-memory snapshot. +// Key format: +// - ton/jettons//masters: JSON []JettonInfo +// - ton/jettons//wallet_to_master: JSON map[string]string +type RedisJettonRegistry struct { + redis infra.RedisClient + chainName string + + fallbackJettons []JettonInfo + + mu sync.RWMutex + jettons map[string]JettonInfo + walletToMaster map[string]string +} + +func NewRedisJettonRegistry(chainName string, redisClient infra.RedisClient, fallback []JettonInfo) *RedisJettonRegistry { + reg := &RedisJettonRegistry{ + redis: redisClient, + chainName: chainName, + fallbackJettons: append([]JettonInfo(nil), fallback...), + jettons: make(map[string]JettonInfo), + walletToMaster: make(map[string]string), + } + reg.applyFallback() + return reg +} + +func (r *RedisJettonRegistry) applyFallback() { + r.mu.Lock() + defer r.mu.Unlock() + + r.jettons = make(map[string]JettonInfo, len(r.fallbackJettons)) + r.walletToMaster = make(map[string]string) + for _, j := range r.fallbackJettons { + if j.MasterAddress == "" { + continue + } + r.jettons[j.MasterAddress] = j + } +} + +func (r *RedisJettonRegistry) mastersKey() string { + return fmt.Sprintf(jettonMasterListKeyFormat, r.chainName) +} + +func (r *RedisJettonRegistry) walletMappingKey() string { + return fmt.Sprintf(jettonWalletMappingKeyFormat, r.chainName) +} + +// Reload refreshes the registry snapshot from Redis. +func (r *RedisJettonRegistry) Reload(_ context.Context) error { + if r.redis == nil { + r.applyFallback() + return nil + } + + nextJettons := make(map[string]JettonInfo, len(r.fallbackJettons)) + for _, j := range r.fallbackJettons { + if j.MasterAddress == "" { + continue + } + nextJettons[j.MasterAddress] = j + } + nextWallets := make(map[string]string) + + masterRaw, err := r.redis.Get(r.mastersKey()) + if err != nil && !errors.Is(err, redis.Nil) { + return fmt.Errorf("get jetton masters from redis: %w", err) + } + if err == nil && strings.TrimSpace(masterRaw) != "" { + var masters []JettonInfo + if unmarshalErr := json.Unmarshal([]byte(masterRaw), &masters); unmarshalErr != nil { + return fmt.Errorf("unmarshal jetton masters: %w", unmarshalErr) + } + for _, j := range masters { + if j.MasterAddress == "" { + continue + } + nextJettons[j.MasterAddress] = j + } + } + + walletRaw, err := r.redis.Get(r.walletMappingKey()) + if err != nil && !errors.Is(err, redis.Nil) { + return fmt.Errorf("get jetton wallet mapping from redis: %w", err) + } + if err == nil && strings.TrimSpace(walletRaw) != "" { + if unmarshalErr := json.Unmarshal([]byte(walletRaw), &nextWallets); unmarshalErr != nil { + return fmt.Errorf("unmarshal jetton wallet mapping: %w", unmarshalErr) + } + } + + r.mu.Lock() + r.jettons = nextJettons + r.walletToMaster = nextWallets + r.mu.Unlock() + return nil +} + +func (r *RedisJettonRegistry) IsSupported(walletAddress string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + masterAddress, ok := r.walletToMaster[walletAddress] + if !ok { + return false + } + _, ok = r.jettons[masterAddress] + return ok +} + +func (r *RedisJettonRegistry) GetInfo(masterAddress string) (*JettonInfo, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + info, ok := r.jettons[masterAddress] + if !ok { + return nil, false + } + return &info, true +} + +func (r *RedisJettonRegistry) GetInfoByWallet(walletAddress string) (*JettonInfo, bool) { + r.mu.RLock() + masterAddress, ok := r.walletToMaster[walletAddress] + if !ok { + r.mu.RUnlock() + return nil, false + } + info, ok := r.jettons[masterAddress] + r.mu.RUnlock() + if !ok { + // Fallback: return master address even when token metadata is not in masters list. + return &JettonInfo{MasterAddress: masterAddress}, true + } + return &info, true +} + +func (r *RedisJettonRegistry) RegisterWallet(walletAddress, masterAddress string) { + if strings.TrimSpace(walletAddress) == "" || strings.TrimSpace(masterAddress) == "" { + return + } + + r.mu.Lock() + r.walletToMaster[walletAddress] = masterAddress + snapshot := make(map[string]string, len(r.walletToMaster)) + for k, v := range r.walletToMaster { + snapshot[k] = v + } + r.mu.Unlock() + + if r.redis == nil { + return + } + + payload, err := json.Marshal(snapshot) + if err != nil { + return + } + _ = r.redis.Set(r.walletMappingKey(), string(payload), 0) +} + +func (r *RedisJettonRegistry) List() []JettonInfo { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]JettonInfo, 0, len(r.jettons)) + for _, j := range r.jettons { + result = append(result, j) + } + return result +} diff --git a/internal/indexer/ton/registry_test.go b/internal/indexer/ton/registry_test.go new file mode 100644 index 0000000..b593c62 --- /dev/null +++ b/internal/indexer/ton/registry_test.go @@ -0,0 +1,72 @@ +package ton + +import ( + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +type mockRedisClient struct { + data map[string]string +} + +func (m *mockRedisClient) GetClient() *redis.Client { return nil } +func (m *mockRedisClient) Set(key string, value any, _ time.Duration) error { + if m.data == nil { + m.data = make(map[string]string) + } + switch v := value.(type) { + case string: + m.data[key] = v + default: + m.data[key] = "" + } + return nil +} +func (m *mockRedisClient) Get(key string) (string, error) { return m.data[key], nil } +func (m *mockRedisClient) Del(_ ...string) error { return nil } +func (m *mockRedisClient) ZAdd(_ string, _ ...redis.Z) error { + return nil +} +func (m *mockRedisClient) ZRem(_ string, _ ...interface{}) error { + return nil +} +func (m *mockRedisClient) ZRange(_ string, _, _ int64) ([]string, error) { + return nil, nil +} +func (m *mockRedisClient) ZRangeWithScores(_ string, _, _ int64) ([]redis.Z, error) { + return nil, nil +} +func (m *mockRedisClient) ZRevRangeWithScores(_ string, _, _ int64) ([]redis.Z, error) { + return nil, nil +} +func (m *mockRedisClient) Close() error { return nil } + +var _ infra.RedisClient = (*mockRedisClient)(nil) + +func TestConfigBasedRegistryGetInfoByWalletFallback(t *testing.T) { + reg := NewConfigBasedRegistry(nil) + reg.RegisterWallet("wallet-1", "master-1") + + info, ok := reg.GetInfoByWallet("wallet-1") + assert.True(t, ok) + assert.Equal(t, "master-1", info.MasterAddress) +} + +func TestRedisJettonRegistryRegisterWalletPersistsAndFallback(t *testing.T) { + redisClient := &mockRedisClient{data: make(map[string]string)} + reg := NewRedisJettonRegistry("ton_testnet", redisClient, nil) + + reg.RegisterWallet("wallet-1", "master-1") + + info, ok := reg.GetInfoByWallet("wallet-1") + assert.True(t, ok) + assert.Equal(t, "master-1", info.MasterAddress) + + raw, exists := redisClient.data[reg.walletMappingKey()] + assert.True(t, exists) + assert.Contains(t, raw, "\"wallet-1\":\"master-1\"") +} diff --git a/internal/rpc/ton/api.go b/internal/rpc/ton/api.go new file mode 100644 index 0000000..fae3917 --- /dev/null +++ b/internal/rpc/ton/api.go @@ -0,0 +1,23 @@ +package ton + +import ( + "context" + + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/tlb" +) + +type TonAPI interface { + // ListTransactions returns transactions for an account. + // - limit: max transactions to return (typically 10-50) + // - lastLT: logical time cursor (0 for initial fetch from beginning) + // - lastHash: transaction hash cursor (nil for initial fetch) + // Returns transactions in reverse chronological order (newest first). + ListTransactions(ctx context.Context, addr *address.Address, limit uint32, lastLT uint64, lastHash []byte) ([]*tlb.Transaction, error) + + // GetLatestMasterchainSeqno returns the latest observed masterchain sequence number. + GetLatestMasterchainSeqno(ctx context.Context) (uint64, error) + + // ResolveJettonMasterAddress resolves a jetton wallet address to its master contract address. + ResolveJettonMasterAddress(ctx context.Context, jettonWallet string) (string, error) +} diff --git a/internal/rpc/ton/client.go b/internal/rpc/ton/client.go new file mode 100644 index 0000000..ed643e7 --- /dev/null +++ b/internal/rpc/ton/client.go @@ -0,0 +1,213 @@ +package ton + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/liteclient" + "github.com/xssnick/tonutils-go/tlb" + "github.com/xssnick/tonutils-go/ton" +) + +type Client struct { + api ton.APIClientWrapped + pool *liteclient.ConnectionPool + baseURL string + mu sync.RWMutex + + // Cache for masterchain info to avoid redundant calls during mass polling + masterCache *ton.BlockIDExt + masterCacheTime time.Time +} + +type ClientConfig struct { + ConfigURL string +} + +func NewClient(ctx context.Context, cfg ClientConfig) (*Client, error) { + pool := liteclient.NewConnectionPool() + + if err := pool.AddConnectionsFromConfigUrl(ctx, cfg.ConfigURL); err != nil { + return nil, fmt.Errorf("failed to fetch/parse global config: %w", err) + } + + // tonutils-go handles failover between lite servers in the pool + api := ton.NewAPIClient(pool, ton.ProofCheckPolicyFast).WithRetry() + + return &Client{ + api: api, + pool: pool, + baseURL: cfg.ConfigURL, + }, nil +} + +func NewClientFromPool(pool *liteclient.ConnectionPool) *Client { + api := ton.NewAPIClient(pool, ton.ProofCheckPolicyFast).WithRetry() + return &Client{ + api: api, + pool: pool, + } +} + +// getMasterchainInfo returns the current masterchain block info, using cache if valid. +func (c *Client) getMasterchainInfo(ctx context.Context) (*ton.BlockIDExt, error) { + c.mu.Lock() + if c.masterCache != nil && time.Since(c.masterCacheTime) < time.Second { + defer c.mu.Unlock() + return c.masterCache, nil + } + c.mu.Unlock() + + // Fetch new info + master, err := c.api.CurrentMasterchainInfo(ctx) + if err != nil { + return nil, err + } + + c.mu.Lock() + c.masterCache = master + c.masterCacheTime = time.Now() + c.mu.Unlock() + + return master, nil +} + +// ListTransactions returns transactions for an account. +// Transactions are returned in reverse chronological order (newest first). +// Use lastLT=0 and lastHash=nil for initial fetch from the account's latest transaction. +func (c *Client) ListTransactions(ctx context.Context, addr *address.Address, limit uint32, lastLT uint64, lastHash []byte) ([]*tlb.Transaction, error) { + if limit == 0 { + limit = 1 + } + + // Get the current masterchain block for consistency + master, err := c.getMasterchainInfo(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get masterchain info: %w", err) + } + + // Always get current account state to check for new transactions + account, err := c.api.GetAccount(ctx, master, addr) + if err != nil { + return nil, fmt.Errorf("failed to get account: %w", err) + } + + // Account not active or doesn't exist + if !account.IsActive { + return nil, nil + } + + // If cursor is at or ahead of latest, no new transactions + if lastLT >= account.LastTxLT { + return nil, nil + } + + // Page backwards from account tip until we reach the saved cursor. + var ( + allNewTxs []*tlb.Transaction + fetchLT = account.LastTxLT + fetchHash = account.LastTxHash + ) + + for { + txs, listErr := c.api.ListTransactions(ctx, addr, limit, fetchLT, fetchHash) + if listErr != nil { + if errors.Is(listErr, ton.ErrNoTransactionsWereFound) && len(allNewTxs) > 0 { + break + } + return nil, fmt.Errorf("failed to list transactions: %w", listErr) + } + if len(txs) == 0 { + break + } + + oldest := txs[0] + for _, tx := range txs { + if tx.LT > lastLT { + allNewTxs = append(allNewTxs, tx) + } + if tx.LT < oldest.LT { + oldest = tx + } + } + + // Reached already-processed range. + if oldest.LT <= lastLT { + break + } + // No more history. + if oldest.PrevTxLT == 0 || len(oldest.PrevTxHash) == 0 { + break + } + + fetchLT = oldest.PrevTxLT + fetchHash = oldest.PrevTxHash + } + + // Keep existing contract used by indexer: newest first. + sort.Slice(allNewTxs, func(i, j int) bool { + return allNewTxs[i].LT > allNewTxs[j].LT + }) + + return allNewTxs, nil +} + +func (c *Client) GetLatestMasterchainSeqno(ctx context.Context) (uint64, error) { + master, err := c.getMasterchainInfo(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get masterchain info: %w", err) + } + return uint64(master.SeqNo), nil +} + +func (c *Client) ResolveJettonMasterAddress(ctx context.Context, jettonWallet string) (string, error) { + walletAddr, err := parseTONAddressAny(jettonWallet) + if err != nil { + return "", fmt.Errorf("invalid jetton wallet address: %w", err) + } + + master, err := c.getMasterchainInfo(ctx) + if err != nil { + return "", fmt.Errorf("failed to get masterchain info: %w", err) + } + + res, err := c.api.WaitForBlock(master.SeqNo).RunGetMethod(ctx, master, walletAddr, "get_wallet_data") + if err != nil { + return "", fmt.Errorf("run get_wallet_data: %w", err) + } + + masterSlice, err := res.Slice(2) + if err != nil { + return "", fmt.Errorf("parse get_wallet_data master address: %w", err) + } + + masterAddr, err := masterSlice.LoadAddr() + if err != nil { + return "", fmt.Errorf("load master address from stack: %w", err) + } + if masterAddr == nil { + return "", fmt.Errorf("jetton master address is nil") + } + + return masterAddr.StringRaw(), nil +} + +func parseTONAddressAny(addr string) (*address.Address, error) { + if raw, err := address.ParseRawAddr(addr); err == nil { + return raw, nil + } + return address.ParseAddr(addr) +} + +func (c *Client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + c.api = nil + c.pool = nil + return nil +} diff --git a/internal/rpc/ton/client_test.go b/internal/rpc/ton/client_test.go new file mode 100644 index 0000000..c509e11 --- /dev/null +++ b/internal/rpc/ton/client_test.go @@ -0,0 +1,26 @@ +package ton + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseTONAddressAny(t *testing.T) { + t.Run("raw_address", func(t *testing.T) { + addr, err := parseTONAddressAny("0:eaa27e0e4fbadad817ac4a106de2bae8b52106b5267c1656a3892538d59c69dc") + require.NoError(t, err) + require.Equal(t, "0:eaa27e0e4fbadad817ac4a106de2bae8b52106b5267c1656a3892538d59c69dc", addr.StringRaw()) + }) + + t.Run("friendly_address", func(t *testing.T) { + addr, err := parseTONAddressAny("EQAd59XMWb40TRfxqTJ0otrUTqD8ZuxLHhC8T401A5-Kr2aY") + require.NoError(t, err) + require.Equal(t, "0:1de7d5cc59be344d17f1a93274a2dad44ea0fc66ec4b1e10bc4f8d35039f8aaf", addr.StringRaw()) + }) + + t.Run("invalid_address", func(t *testing.T) { + _, err := parseTONAddressAny("not-an-address") + require.Error(t, err) + }) +} diff --git a/internal/worker/factory.go b/internal/worker/factory.go index dcf3608..17feecf 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -5,12 +5,15 @@ import ( "strconv" "github.com/fystack/multichain-indexer/internal/indexer" + tonIndexer "github.com/fystack/multichain-indexer/internal/indexer/ton" "github.com/fystack/multichain-indexer/internal/rpc" "github.com/fystack/multichain-indexer/internal/rpc/bitcoin" "github.com/fystack/multichain-indexer/internal/rpc/evm" "github.com/fystack/multichain-indexer/internal/rpc/solana" "github.com/fystack/multichain-indexer/internal/rpc/sui" + tonRpc "github.com/fystack/multichain-indexer/internal/rpc/ton" "github.com/fystack/multichain-indexer/internal/rpc/tron" + tonWorker "github.com/fystack/multichain-indexer/internal/worker/ton" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" @@ -288,6 +291,81 @@ func buildSuiIndexer( return indexer.NewSuiIndexer(chainName, chainCfg, failover, pubkeyStore) } +// buildTonPollingWorker constructs a TON polling worker with failover. +// TON uses account-based polling instead of block-based indexing. +func buildTonPollingWorker( + ctx context.Context, + chainName string, + chainCfg config.ChainConfig, + kvstore infra.KVStore, + redisClient infra.RedisClient, + db *gorm.DB, + emitter events.Emitter, +) Worker { + var client tonRpc.TonAPI + + if len(chainCfg.Nodes) > 0 { + configURL := chainCfg.Nodes[0].URL + + var err error // shadow assignment + client, err = tonRpc.NewClient(ctx, tonRpc.ClientConfig{ + ConfigURL: configURL, + }) + + if err != nil { + logger.Error("Failed to create TON client with global config", "url", configURL, "err", err) + // Proceed with nil client, IsHealthy() will return false + } + } else { + logger.Error("No nodes configured for TON chain", "chain", chainName) + } + + // Create cursor store backed by KVStore + cursorStore := tonIndexer.NewCursorStore(kvstore) + + // Create Jetton registry + var jettons []tonIndexer.JettonInfo + for _, j := range chainCfg.Jettons { + jettons = append(jettons, tonIndexer.JettonInfo{ + MasterAddress: j.MasterAddress, + Symbol: j.Symbol, + Decimals: j.Decimals, + }) + } + jettonRegistry := tonIndexer.NewRedisJettonRegistry(chainName, redisClient, jettons) + if err := jettonRegistry.Reload(ctx); err != nil { + logger.Error("Failed to load jetton registry from redis, using fallback config", + "chain", chainName, + "err", err, + ) + } + + // Create the account indexer + accountIndexer := tonIndexer.NewTonAccountIndexer( + chainName, + chainCfg, + client, + jettonRegistry, + ) + + // Create the polling worker + worker := tonWorker.NewTonPollingWorker( + ctx, + chainName, + chainCfg, + accountIndexer, + cursorStore, + db, + kvstore, + emitter, + tonWorker.WorkerConfig{ + Concurrency: chainCfg.Throttle.Concurrency, + PollInterval: chainCfg.PollInterval, + }, + ) + return worker +} + // CreateManagerWithWorkers initializes manager and all workers for configured chains. func CreateManagerWithWorkers( ctx context.Context, @@ -327,6 +405,13 @@ func CreateManagerWithWorkers( idxr = buildSolanaIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) case enum.NetworkTypeSui: idxr = buildSuiIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) + case enum.NetworkTypeTon: + tonW := buildTonPollingWorker(ctx, chainName, chainCfg, kvstore, redisClient, db, emitter) + if tonW != nil { + manager.AddWorkers(tonW) + logger.Info("TON polling worker enabled", "chain", chainName) + } + continue default: logger.Fatal("Unsupported network type", "chain", chainName, "type", chainCfg.Type) } diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 5068485..2c7375a 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -80,3 +80,10 @@ func (m *Manager) closeResource(name string, resource interface{}, closer func() func (m *Manager) AddWorkers(workers ...Worker) { m.workers = append(m.workers, workers...) } + +// Workers returns a copy of managed workers. +func (m *Manager) Workers() []Worker { + out := make([]Worker, 0, len(m.workers)) + out = append(out, m.workers...) + return out +} diff --git a/internal/worker/ton/worker.go b/internal/worker/ton/worker.go new file mode 100644 index 0000000..4d1562d --- /dev/null +++ b/internal/worker/ton/worker.go @@ -0,0 +1,509 @@ +package ton + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + tonIndexer "github.com/fystack/multichain-indexer/internal/indexer/ton" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/logger" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/fystack/multichain-indexer/pkg/events" + "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/fystack/multichain-indexer/pkg/model" + "github.com/fystack/multichain-indexer/pkg/retry" + "gorm.io/gorm" +) + +const walletListKeyFormat = "ton/wallets/%s" + +// TonPollingWorker polls multiple TON accounts for transactions. +type TonPollingWorker struct { + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger + wg sync.WaitGroup + + chainName string + config config.ChainConfig + indexer tonIndexer.AccountIndexer + cursorStore tonIndexer.CursorStore + db *gorm.DB + kvstore infra.KVStore + emitter events.Emitter + + // Cache + wallets []string + walletSet map[string]struct{} + walletsInitialized bool + walletsMutex sync.RWMutex + + // Worker configuration + concurrency int + pollInterval time.Duration +} + +// WorkerConfig holds configuration for the TON polling worker. +type WorkerConfig struct { + Concurrency int // Max parallel account polls (default: 10) + PollInterval time.Duration // Interval between poll cycles (default: from chain config) +} + +// NewTonPollingWorker creates a new TON polling worker. +func NewTonPollingWorker( + ctx context.Context, + chainName string, + cfg config.ChainConfig, + indexer tonIndexer.AccountIndexer, + cursorStore tonIndexer.CursorStore, + db *gorm.DB, + kvstore infra.KVStore, + emitter events.Emitter, + workerCfg WorkerConfig, +) *TonPollingWorker { + ctx, cancel := context.WithCancel(ctx) + + log := logger.With( + slog.String("worker", "ton-polling"), + slog.String("chain", cfg.Name), + ) + + concurrency := workerCfg.Concurrency + if concurrency <= 0 { + concurrency = 10 + } + + pollInterval := workerCfg.PollInterval + if pollInterval <= 0 { + pollInterval = cfg.PollInterval + } + + return &TonPollingWorker{ + ctx: ctx, + cancel: cancel, + logger: log, + chainName: chainName, + config: cfg, + indexer: indexer, + cursorStore: cursorStore, + db: db, + kvstore: kvstore, + emitter: emitter, + concurrency: concurrency, + pollInterval: pollInterval, + } +} + +// Start begins the polling loop. +func (w *TonPollingWorker) Start() { + w.wg.Add(1) + go w.run() +} + +// Stop gracefully stops the worker. +func (w *TonPollingWorker) Stop() { + w.cancel() + w.wg.Wait() + w.logger.Info("TON polling worker stopped") +} + +// run is the main polling loop. +func (w *TonPollingWorker) run() { + defer w.wg.Done() + + ticker := time.NewTicker(w.pollInterval) + defer ticker.Stop() + + w.logger.Info("TON polling worker started", + "poll_interval", w.pollInterval, + "concurrency", w.concurrency, + ) + + // Run immediately on start + w.pollAllAccounts() + + for { + select { + case <-w.ctx.Done(): + w.logger.Info("Context cancelled, stopping polling loop") + return + + case <-ticker.C: + w.pollAllAccounts() + } + } +} + +// pollAllAccounts polls all tracked TON addresses from in-memory cache. +func (w *TonPollingWorker) pollAllAccounts() { + if err := w.ensureWalletsLoaded(); err != nil { + w.logger.Error("Failed to ensure wallet list", "err", err) + return + } + + addresses := w.snapshotWallets() + + if len(addresses) == 0 { + w.logger.Debug("No TON addresses to poll") + return + } + + w.logger.Info("Starting poll cycle", "address_count", len(addresses)) + + workChan := make(chan string, len(addresses)) + for _, addr := range addresses { + workChan <- addr + } + close(workChan) + + var wg sync.WaitGroup + for i := 0; i < w.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + w.pollWorker(workChan) + }() + } + + wg.Wait() + w.logger.Debug("Poll cycle complete", "address_count", len(addresses)) +} + +func (w *TonPollingWorker) snapshotWallets() []string { + w.walletsMutex.RLock() + defer w.walletsMutex.RUnlock() + out := make([]string, len(w.wallets)) + copy(out, w.wallets) + return out +} + +// pollWorker processes addresses from the work channel. +func (w *TonPollingWorker) pollWorker(addresses <-chan string) { + for addr := range addresses { + select { + case <-w.ctx.Done(): + return + default: + w.pollAccount(addr) + } + } +} + +// pollAccount polls a single account with retry logic. +func (w *TonPollingWorker) pollAccount(address string) { + log := w.logger.With("address", address) + + cursor, err := w.cursorStore.Get(w.ctx, address) + if err != nil { + log.Error("Failed to get cursor", "err", err) + return + } + if cursor == nil { + cursor = &tonIndexer.AccountCursor{Address: address} + } + + if w.ctx.Err() != nil { + return + } + + txs, newCursor, err := w.pollAccountWithRetry(address, cursor, log) + if err != nil { + log.Error("Failed to poll account after retries", "err", err) + return + } + if w.ctx.Err() != nil { + return + } + + if newCursor != nil { + if err := w.cursorStore.Save(w.ctx, newCursor); err != nil { + log.Error("Failed to save cursor", "err", err) + } + } + + if len(txs) == 0 { + return + } + + log.Info("Found transactions", "count", len(txs)) + for i := range txs { + tx := &txs[i] + if w.shouldSkipTrackedInternalTransfer(address, tx) { + log.Debug("Skipping duplicate tracked-wallet transfer from receiver-side poll", + "from", tx.FromAddress, + "to", tx.ToAddress, + "txhash", tx.TxHash, + ) + continue + } + + log.Info("Emitting matched transaction", + "type", tx.Type, + "from", tx.FromAddress, + "to", tx.ToAddress, + "asset_address", tx.AssetAddress, + "amount", tx.Amount, + "fee", tx.TxFee.String(), + "txhash", tx.TxHash, + ) + if err := w.emitter.EmitTransaction(w.config.InternalCode, tx); err != nil { + log.Error("Failed to emit transaction", "tx_hash", tx.TxHash, "err", err) + } else { + log.Debug("Emitted transaction", "tx_hash", tx.TxHash) + } + } +} + +func (w *TonPollingWorker) pollAccountWithRetry( + address string, + cursor *tonIndexer.AccountCursor, + log *slog.Logger, +) ([]types.Transaction, *tonIndexer.AccountCursor, error) { + var ( + txs []types.Transaction + newCursor *tonIndexer.AccountCursor + ) + + err := retry.Exponential(func() error { + if w.ctx.Err() != nil { + return nil + } + + result, c, pollErr := w.indexer.PollAccount(w.ctx, address, cursor) + if pollErr != nil { + if w.ctx.Err() != nil { + return nil + } + return pollErr + } + txs = result + newCursor = c + return nil + }, retry.ExponentialConfig{ + InitialInterval: 2 * time.Second, + MaxElapsedTime: 30 * time.Second, + OnRetry: func(err error, next time.Duration) { + if w.ctx.Err() == nil { + log.Debug("Retrying poll", "err", err, "next_retry", next) + } + }, + }) + if w.ctx.Err() != nil { + return nil, nil, nil + } + return txs, newCursor, err +} + +// AddAddress registers a new address for tracking. +// This initializes the cursor and starts polling the address. +func (w *TonPollingWorker) AddAddress(ctx context.Context, address string) error { + normalizedAddress := tonIndexer.NormalizeTONAddressRaw(address) + if normalizedAddress == "" { + return fmt.Errorf("invalid TON address: %s", address) + } + + // Check if cursor already exists + existing, err := w.cursorStore.Get(ctx, normalizedAddress) + if err != nil { + return err + } + + if existing != nil { + // Already tracking + return nil + } + + // Create initial cursor + cursor := &tonIndexer.AccountCursor{Address: normalizedAddress} + return w.cursorStore.Save(ctx, cursor) +} + +// RemoveAddress stops tracking an address. +func (w *TonPollingWorker) RemoveAddress(ctx context.Context, address string) error { + normalizedAddress := tonIndexer.NormalizeTONAddressRaw(address) + if normalizedAddress != "" { + return w.cursorStore.Delete(ctx, normalizedAddress) + } + return w.cursorStore.Delete(ctx, address) +} + +// GetNetworkType implements the worker interface. +func (w *TonPollingWorker) GetNetworkType() enum.NetworkType { + return enum.NetworkTypeTon +} + +func (w *TonPollingWorker) GetName() string { + return w.chainName +} + +func (w *TonPollingWorker) walletListKey() string { + return fmt.Sprintf(walletListKeyFormat, w.chainName) +} + +func (w *TonPollingWorker) ensureWalletsLoaded() error { + w.walletsMutex.RLock() + initialized := w.walletsInitialized + w.walletsMutex.RUnlock() + + if initialized { + return nil + } + + if _, err := w.ReloadWalletsFromKV(w.ctx); err != nil { + return err + } + + w.walletsMutex.RLock() + hasWallets := len(w.wallets) > 0 + w.walletsMutex.RUnlock() + if hasWallets { + return nil + } + + // KV empty fallback: sync once from DB to bootstrap. + if w.db == nil { + return nil + } + _, err := w.ReloadWalletsFromDB(w.ctx) + return err +} + +func (w *TonPollingWorker) replaceWalletCache(addresses []string) int { + walletSet := make(map[string]struct{}, len(addresses)) + for _, addr := range addresses { + walletSet[addr] = struct{}{} + } + + w.walletsMutex.Lock() + oldSize := len(w.wallets) + w.wallets = addresses + w.walletSet = walletSet + w.walletsInitialized = true + w.walletsMutex.Unlock() + + w.logger.Info("Wallet cache updated", + "old_size", oldSize, + "new_size", len(addresses), + ) + return len(addresses) +} + +func (w *TonPollingWorker) isTrackedWallet(address string) bool { + normalized := tonIndexer.NormalizeTONAddressRaw(address) + if normalized == "" { + return false + } + + w.walletsMutex.RLock() + _, ok := w.walletSet[normalized] + w.walletsMutex.RUnlock() + return ok +} + +func (w *TonPollingWorker) shouldSkipTrackedInternalTransfer(polledAddress string, tx *types.Transaction) bool { + from := tonIndexer.NormalizeTONAddressRaw(tx.FromAddress) + to := tonIndexer.NormalizeTONAddressRaw(tx.ToAddress) + if from == "" || to == "" { + return false + } + + // Keep only one side when both sender and receiver are tracked. + // Sender-side poll has polledAddress == from. + if !w.isTrackedWallet(from) || !w.isTrackedWallet(to) { + return false + } + + polled := tonIndexer.NormalizeTONAddressRaw(polledAddress) + if polled == "" { + return false + } + + return polled != from +} + +// ReloadWalletsFromKV refreshes in-memory wallets from KV store. +func (w *TonPollingWorker) ReloadWalletsFromKV(_ context.Context) (int, error) { + if w.kvstore == nil { + return 0, fmt.Errorf("kvstore is not configured") + } + + var addresses []string + found, err := w.kvstore.GetAny(w.walletListKey(), &addresses) + if err != nil { + return 0, fmt.Errorf("get wallet list from kv: %w", err) + } + if !found { + w.replaceWalletCache(nil) + return 0, nil + } + + return w.replaceWalletCache(tonIndexer.NormalizeTONAddressList(addresses)), nil +} + +// ReloadWalletsFromDB reloads wallets from database and persists them to KV store. +func (w *TonPollingWorker) ReloadWalletsFromDB(_ context.Context) (int, error) { + if w.db == nil { + return 0, fmt.Errorf("database is not configured") + } + if w.kvstore == nil { + return 0, fmt.Errorf("kvstore is not configured") + } + + allAddresses, err := w.loadWalletAddressesFromDB() + if err != nil { + return 0, err + } + + cleaned := tonIndexer.NormalizeTONAddressList(allAddresses) + if err := w.kvstore.SetAny(w.walletListKey(), cleaned); err != nil { + return 0, fmt.Errorf("persist wallet list to kv: %w", err) + } + + return w.replaceWalletCache(cleaned), nil +} + +// ReloadJettons refreshes TON jetton registry from Redis (with config fallback). +func (w *TonPollingWorker) ReloadJettons(ctx context.Context) (int, error) { + count, err := w.indexer.ReloadJettons(ctx) + if err != nil { + return 0, err + } + + w.logger.Info("Jetton registry reloaded", "chain", w.chainName, "jetton_count", count) + return count, nil +} + +func (w *TonPollingWorker) loadWalletAddressesFromDB() ([]string, error) { + const batchSize = 1000 + + allAddresses := make([]string, 0, batchSize) + offset := 0 + for { + var wallets []model.WalletAddress + err := w.db.Select("address"). + Where("type = ?", enum.NetworkTypeTon). + Order("id"). + Limit(batchSize). + Offset(offset). + Find(&wallets).Error + if err != nil { + return nil, fmt.Errorf("fetch wallets from db (offset=%d): %w", offset, err) + } + if len(wallets) == 0 { + break + } + for _, wallet := range wallets { + allAddresses = append(allAddresses, wallet.Address) + } + if len(wallets) < batchSize { + break + } + offset += batchSize + } + + return allAddresses, nil +} diff --git a/internal/worker/ton/worker_test.go b/internal/worker/ton/worker_test.go new file mode 100644 index 0000000..6bf831c --- /dev/null +++ b/internal/worker/ton/worker_test.go @@ -0,0 +1,43 @@ +package ton + +import ( + "io" + "log/slog" + "testing" + + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/stretchr/testify/assert" +) + +func TestShouldSkipTrackedInternalTransfer(t *testing.T) { + const ( + sender = "0:2942e40f94b5a2f111ea2ff98beb5f634f3a971f99f7fedafff5164c4bfa1bef" + receiver = "0:fc58a2bb35b051810bef84fce18747ac2c2cfcbe0ce3d3167193d9b2538ef33e" + external = "0:5df8318107d8988e2ab298b0190ba9f923267bc1575f4532a49f37384db6a799" + ) + + w := &TonPollingWorker{} + w.logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + w.replaceWalletCache([]string{sender, receiver}) + + tx := &types.Transaction{ + FromAddress: sender, + ToAddress: receiver, + } + + t.Run("keep_sender_side_poll", func(t *testing.T) { + assert.False(t, w.shouldSkipTrackedInternalTransfer(sender, tx)) + }) + + t.Run("skip_receiver_side_poll", func(t *testing.T) { + assert.True(t, w.shouldSkipTrackedInternalTransfer(receiver, tx)) + }) + + t.Run("do_not_skip_external_to_tracked", func(t *testing.T) { + externalTx := &types.Transaction{ + FromAddress: external, + ToAddress: receiver, + } + assert.False(t, w.shouldSkipTrackedInternalTransfer(receiver, externalTx)) + }) +} diff --git a/internal/worker/ton_reload.go b/internal/worker/ton_reload.go new file mode 100644 index 0000000..8500ad0 --- /dev/null +++ b/internal/worker/ton_reload.go @@ -0,0 +1,203 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + + "github.com/fystack/multichain-indexer/pkg/common/enum" +) + +var ( + ErrNoTonWorkerConfigured = errors.New("no ton worker configured") + ErrTonWorkerNotFound = errors.New("ton worker not found") +) + +type TonJettonReloadRequest struct { + ChainFilter string +} + +type WalletReloadSource string + +const ( + WalletReloadSourceKV WalletReloadSource = "kv" + WalletReloadSourceDB WalletReloadSource = "db" +) + +func (s WalletReloadSource) Normalize() WalletReloadSource { + switch strings.ToLower(strings.TrimSpace(string(s))) { + case string(WalletReloadSourceDB): + return WalletReloadSourceDB + case string(WalletReloadSourceKV), "": + return WalletReloadSourceKV + default: + return WalletReloadSource(strings.ToLower(strings.TrimSpace(string(s)))) + } +} + +func (s WalletReloadSource) IsValid() bool { + normalized := s.Normalize() + return normalized == WalletReloadSourceKV || normalized == WalletReloadSourceDB +} + +type TonWalletReloadRequest struct { + Source WalletReloadSource + ChainFilter string +} + +type TonWalletReloadResult struct { + Chain string `json:"chain"` + ReloadedWallets int `json:"reloaded_wallets"` + Error string `json:"error,omitempty"` +} + +type TonJettonReloadResult struct { + Chain string `json:"chain"` + ReloadedJettons int `json:"reloaded_jettons"` + Error string `json:"error,omitempty"` +} + +// TonWalletReloader is implemented by TON workers that support runtime wallet reload. +type TonWalletReloader interface { + Worker + GetName() string + GetNetworkType() enum.NetworkType + ReloadWalletsFromKV(ctx context.Context) (int, error) + ReloadWalletsFromDB(ctx context.Context) (int, error) +} + +// TonJettonReloader is implemented by TON workers that support runtime jetton reload. +type TonJettonReloader interface { + Worker + GetName() string + GetNetworkType() enum.NetworkType + ReloadJettons(ctx context.Context) (int, error) +} + +// TonWalletReloadService handles runtime wallet cache reload for TON workers. +type TonWalletReloadService struct { + reloaders []TonWalletReloader +} + +// TonJettonReloadService handles runtime jetton registry reload for TON workers. +type TonJettonReloadService struct { + reloaders []TonJettonReloader +} + +func NewTonWalletReloadService(workers []Worker) *TonWalletReloadService { + reloaders := make([]TonWalletReloader, 0) + for _, w := range workers { + reloader, ok := w.(TonWalletReloader) + if !ok || reloader.GetNetworkType() != enum.NetworkTypeTon { + continue + } + reloaders = append(reloaders, reloader) + } + + return &TonWalletReloadService{reloaders: reloaders} +} + +func NewTonWalletReloadServiceFromManager(m *Manager) *TonWalletReloadService { + if m == nil { + return &TonWalletReloadService{} + } + return NewTonWalletReloadService(m.Workers()) +} + +func NewTonJettonReloadService(workers []Worker) *TonJettonReloadService { + reloaders := make([]TonJettonReloader, 0) + for _, w := range workers { + reloader, ok := w.(TonJettonReloader) + if !ok || reloader.GetNetworkType() != enum.NetworkTypeTon { + continue + } + reloaders = append(reloaders, reloader) + } + return &TonJettonReloadService{reloaders: reloaders} +} + +func NewTonJettonReloadServiceFromManager(m *Manager) *TonJettonReloadService { + if m == nil { + return &TonJettonReloadService{} + } + return NewTonJettonReloadService(m.Workers()) +} + +func (s *TonWalletReloadService) ReloadTonWallets( + ctx context.Context, + req TonWalletReloadRequest, +) ([]TonWalletReloadResult, error) { + source := req.Source.Normalize() + results := make([]TonWalletReloadResult, 0) + + for _, reloader := range s.reloaders { + chainName := reloader.GetName() + if req.ChainFilter != "" && req.ChainFilter != chainName { + continue + } + + item := TonWalletReloadResult{Chain: chainName} + var ( + count int + err error + ) + switch source { + case WalletReloadSourceDB: + count, err = reloader.ReloadWalletsFromDB(ctx) + default: + count, err = reloader.ReloadWalletsFromKV(ctx) + } + + if err != nil { + item.Error = err.Error() + } else { + item.ReloadedWallets = count + } + results = append(results, item) + } + + if len(results) == 0 { + if req.ChainFilter != "" { + return nil, fmt.Errorf("%w: %s", ErrTonWorkerNotFound, req.ChainFilter) + } + return nil, ErrNoTonWorkerConfigured + } + + sort.Slice(results, func(i, j int) bool { return results[i].Chain < results[j].Chain }) + return results, nil +} + +func (s *TonJettonReloadService) ReloadTonJettons( + ctx context.Context, + req TonJettonReloadRequest, +) ([]TonJettonReloadResult, error) { + results := make([]TonJettonReloadResult, 0) + + for _, reloader := range s.reloaders { + chainName := reloader.GetName() + if req.ChainFilter != "" && req.ChainFilter != chainName { + continue + } + + item := TonJettonReloadResult{Chain: chainName} + count, err := reloader.ReloadJettons(ctx) + if err != nil { + item.Error = err.Error() + } else { + item.ReloadedJettons = count + } + results = append(results, item) + } + + if len(results) == 0 { + if req.ChainFilter != "" { + return nil, fmt.Errorf("%w: %s", ErrTonWorkerNotFound, req.ChainFilter) + } + return nil, ErrNoTonWorkerConfigured + } + + sort.Slice(results, func(i, j int) bool { return results[i].Chain < results[j].Chain }) + return results, nil +} diff --git a/pkg/common/config/chains.go b/pkg/common/config/chains.go index a04b044..572ac14 100644 --- a/pkg/common/config/chains.go +++ b/pkg/common/config/chains.go @@ -47,6 +47,9 @@ func (c Chains) OverrideFromLatest(names []string) { // ApplyDefaults merges global defaults into all chain configs. func (c Chains) ApplyDefaults(def Defaults) error { for name, chain := range c { + if !chain.FromLatest { + chain.FromLatest = def.FromLatest + } if chain.PollInterval == 0 { chain.PollInterval = def.PollInterval } diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index c5bf40a..29118d5 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -37,7 +37,7 @@ type ChainConfig struct { Name string `yaml:"-"` NetworkId string `yaml:"network_id"` InternalCode string `yaml:"internal_code"` - Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm btc sol sui"` + Type enum.NetworkType `yaml:"type" validate:"required"` FromLatest bool `yaml:"from_latest"` StartBlock int `yaml:"start_block" validate:"min=0"` PollInterval time.Duration `yaml:"poll_interval"` @@ -46,6 +46,13 @@ type ChainConfig struct { Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Nodes []NodeConfig `yaml:"nodes" validate:"required,min=1"` + Jettons []JettonConfig `yaml:"jettons"` +} + +type JettonConfig struct { + MasterAddress string `yaml:"master_address"` + Symbol string `yaml:"symbol"` + Decimals int `yaml:"decimals"` } type ClientConfig struct { diff --git a/pkg/common/enum/enum.go b/pkg/common/enum/enum.go index 068e508..c123c9f 100644 --- a/pkg/common/enum/enum.go +++ b/pkg/common/enum/enum.go @@ -25,6 +25,7 @@ const ( NetworkTypeSol NetworkType = "sol" NetworkTypeApt NetworkType = "apt" NetworkTypeSui NetworkType = "sui" + NetworkTypeTon NetworkType = "ton" ) const ( diff --git a/pkg/common/types/types.go b/pkg/common/types/types.go index 00604c7..4fc842c 100644 --- a/pkg/common/types/types.go +++ b/pkg/common/types/types.go @@ -20,18 +20,20 @@ type Block struct { } type Transaction struct { - TxHash string `json:"txHash"` - NetworkId string `json:"networkId"` - BlockNumber uint64 `json:"blockNumber"` // 0 for mempool transactions - FromAddress string `json:"fromAddress"` - ToAddress string `json:"toAddress"` - AssetAddress string `json:"assetAddress"` - Amount string `json:"amount"` - Type constant.TxType `json:"type"` - TxFee decimal.Decimal `json:"txFee"` - Timestamp uint64 `json:"timestamp"` - Confirmations uint64 `json:"confirmations"` // Number of confirmations (0 = mempool/unconfirmed) - Status string `json:"status"` // "pending" (0 conf), "confirmed" (1+ conf) + TxHash string `json:"txHash"` + NetworkId string `json:"networkId"` + BlockNumber uint64 `json:"blockNumber"` // 0 for mempool transactions + LogicalTime uint64 `json:"logicalTime,omitempty"` + MasterchainSeqno uint64 `json:"masterchainSeqno,omitempty"` + FromAddress string `json:"fromAddress"` + ToAddress string `json:"toAddress"` + AssetAddress string `json:"assetAddress"` + Amount string `json:"amount"` + Type constant.TxType `json:"type"` + TxFee decimal.Decimal `json:"txFee"` + Timestamp uint64 `json:"timestamp"` + Confirmations uint64 `json:"confirmations"` // Number of confirmations (0 = mempool/unconfirmed) + Status string `json:"status"` // "pending" (0 conf), "confirmed" (1+ conf) } func (t Transaction) MarshalBinary() ([]byte, error) { @@ -48,10 +50,12 @@ func (t *Transaction) UnmarshalBinary(data []byte) error { func (t Transaction) String() string { return fmt.Sprintf( - "{TxHash: %s, NetworkId: %s, BlockNumber: %d, FromAddress: %s, ToAddress: %s, AssetAddress: %s, Amount: %s, Type: %s, TxFee: %s, Timestamp: %d, Confirmations: %d, Status: %s}", + "{TxHash: %s, NetworkId: %s, BlockNumber: %d, LogicalTime: %d, MasterchainSeqno: %d, FromAddress: %s, ToAddress: %s, AssetAddress: %s, Amount: %s, Type: %s, TxFee: %s, Timestamp: %d, Confirmations: %d, Status: %s}", t.TxHash, t.NetworkId, t.BlockNumber, + t.LogicalTime, + t.MasterchainSeqno, t.FromAddress, t.ToAddress, t.AssetAddress, diff --git a/pkg/store/toncursorstore/store.go b/pkg/store/toncursorstore/store.go new file mode 100644 index 0000000..53eb84e --- /dev/null +++ b/pkg/store/toncursorstore/store.go @@ -0,0 +1,84 @@ +package toncursorstore + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/fystack/multichain-indexer/pkg/infra" +) + +const keyPrefix = "ton/cursor/" + +// AccountCursor tracks the polling position for a single TON account. +type AccountCursor struct { + Address string `json:"address"` + LastLT uint64 `json:"last_lt"` // Logical time of last processed tx + LastHash string `json:"last_hash"` // Hex-encoded hash of last processed tx + UpdatedAt time.Time `json:"updated_at"` +} + +type Store interface { + Get(ctx context.Context, address string) (*AccountCursor, error) + Save(ctx context.Context, cursor *AccountCursor) error + Delete(ctx context.Context, address string) error + List(ctx context.Context) ([]string, error) +} + +type kvStore struct { + kv infra.KVStore +} + +func New(kv infra.KVStore) Store { + return &kvStore{kv: kv} +} + +func (s *kvStore) cursorKey(address string) string { + return keyPrefix + address +} + +func (s *kvStore) Get(_ context.Context, address string) (*AccountCursor, error) { + var cursor AccountCursor + found, err := s.kv.GetAny(s.cursorKey(address), &cursor) + if err != nil { + return nil, fmt.Errorf("failed to get cursor for %s: %w", address, err) + } + if !found { + return nil, nil + } + return &cursor, nil +} + +func (s *kvStore) Save(_ context.Context, cursor *AccountCursor) error { + cursor.UpdatedAt = time.Now() + if err := s.kv.SetAny(s.cursorKey(cursor.Address), cursor); err != nil { + return fmt.Errorf("failed to save cursor for %s: %w", cursor.Address, err) + } + return nil +} + +func (s *kvStore) Delete(_ context.Context, address string) error { + if err := s.kv.Delete(s.cursorKey(address)); err != nil { + return fmt.Errorf("failed to delete cursor for %s: %w", address, err) + } + return nil +} + +func (s *kvStore) List(_ context.Context) ([]string, error) { + pairs, err := s.kv.List(keyPrefix) + if err != nil { + return nil, fmt.Errorf("failed to list cursors: %w", err) + } + + addresses := make([]string, 0, len(pairs)) + for _, pair := range pairs { + var cursor AccountCursor + if err := json.Unmarshal(pair.Value, &cursor); err != nil { + continue + } + addresses = append(addresses, cursor.Address) + } + + return addresses, nil +} diff --git a/sql/wallet_address.sql b/sql/wallet_address.sql index 93bb13e..5ed2f8f 100644 --- a/sql/wallet_address.sql +++ b/sql/wallet_address.sql @@ -56,4 +56,7 @@ COMMENT ON COLUMN wallet_addresses.standard IS 'The token standard (erc20, erc72 -- Insert sample data INSERT INTO wallet_addresses (address, type, standard) VALUES ('TAWdqnuYCNU3dKsi7pR8d7sDkx1Evb2giV', 'tron', 'trc20'), -('TT1j2adMBb6bF2K8C2LX1QkkmSXHjiaAfw', 'tron', 'trc20'); \ No newline at end of file +('TT1j2adMBb6bF2K8C2LX1QkkmSXHjiaAfw', 'tron', 'trc20'), +('Ef8zMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzMzM0vF', 'ton', 'native'), +('EQDKHZ7e70CzqdvZCC83Z4WVR8POC_ZB0J1Y4zo88G-zCXmC', 'ton', 'native'), +('EQBeab7D38RIwypegbN7YZgQzwDbb8QfMMwY8ouJc3qPl91M', 'ton', 'native'); diff --git a/test/parse_ton_txn/main.go b/test/parse_ton_txn/main.go new file mode 100644 index 0000000..5a5761e --- /dev/null +++ b/test/parse_ton_txn/main.go @@ -0,0 +1,151 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "net/http" + + indexer "github.com/fystack/multichain-indexer/internal/indexer/ton" + "github.com/xssnick/tonutils-go/address" + "github.com/xssnick/tonutils-go/liteclient" + "github.com/xssnick/tonutils-go/ton" +) + +const ( + txHashStr = "uoIXlCHFgKwOWjLEQx1E3Wu4gSbhcvFmHkNHlABx21E=" + configURL = "https://ton.org/global.config.json" +) + +type TonApiTx struct { + Hash string `json:"hash"` + Lt uint64 `json:"lt"` + Account struct { + Address string `json:"address"` + } `json:"account"` +} + +func main() { + // 1. Fetch transaction details from tonapi.io to get Account Address and LT + // This is necessary because liteservers typically require (Account, LT, Hash) to look up a Tx. + hexHashBytes, err := base64.StdEncoding.DecodeString(txHashStr) + if err != nil { + log.Fatalf("Invalid base64 hash: %v", err) + } + hexHash := hex.EncodeToString(hexHashBytes) + + fmt.Printf("Fetching details for hash: %s (hex: %s)\n", txHashStr, hexHash) + + resp, err := http.Get("https://tonapi.io/v2/blockchain/transactions/" + hexHash) + if err != nil { + log.Fatalf("Failed to query tonapi.io: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + log.Fatalf("tonapi.io returned status %d. Ensure the hash is correct and tonapi is accessible.", resp.StatusCode) + } + + var apiTx TonApiTx + if err := json.NewDecoder(resp.Body).Decode(&apiTx); err != nil { + log.Fatalf("Failed to decode tonapi response: %v", err) + } + + fmt.Printf("Found Transaction via API:\n Account: %s\n LT: %d\n", apiTx.Account.Address, apiTx.Lt) + + // 2. Initialize TON Lite Client + ctx := context.Background() + pool := liteclient.NewConnectionPool() + if err := pool.AddConnectionsFromConfigUrl(ctx, configURL); err != nil { + log.Fatalf("Failed to convert config url: %v", err) + } + + // Use the generic API client + api := ton.NewAPIClient(pool, ton.ProofCheckPolicyFast).WithRetry() + + // 3. Parse target address + fmt.Printf("Parsing address: '%s'\n", apiTx.Account.Address) + + var addr *address.Address + addr, err = address.ParseAddr(apiTx.Account.Address) + if err != nil { + fmt.Printf("address.ParseAddr failed: %v. Attempting manual raw parse...\n", err) + // Fallback: Parse raw address manually (assuming 0:hash format) + var wc int8 = 0 // Default to workchain 0 + var hashHex string + + if len(apiTx.Account.Address) > 2 && apiTx.Account.Address[1] == ':' { + // Basic check for workchain + n, _ := fmt.Sscanf(apiTx.Account.Address, "%d:%s", &wc, &hashHex) + if n != 2 { + // Try just using the string as hash if scan failed + hashHex = apiTx.Account.Address + } + } else { + hashHex = apiTx.Account.Address + } + + hashBytes, hErr := hex.DecodeString(hashHex) + if hErr == nil && len(hashBytes) == 32 { + addr = address.NewAddress(0, byte(wc), hashBytes) + fmt.Println("Manually parsed raw address.") + } else { + log.Fatalf("Could not parse address: %v. Raw parse also failed: %v", err, hErr) + } + } + + // 4. Fetch the specific transaction + // ListTransactions retrieves transactions starting from the specified LT/Hash. + // To get ONLY this transaction, we ask for limit 1 starting at its LT/Hash. + txs, err := api.ListTransactions(ctx, addr, 1, apiTx.Lt, hexHashBytes) + if err != nil { + log.Fatalf("Failed to fetch transaction from liteserver: %v", err) + } + + if len(txs) == 0 { + log.Fatalf("Transaction not found on liteserver (archival node might be needed if old).") + } + + tx := txs[0] + // Verify it's the right one + if hex.EncodeToString(tx.Hash) != hexHash { + fmt.Printf("Warning: Fetched transaction hash mismatch. Expected %s, got %s\n", hexHash, hex.EncodeToString(tx.Hash)) + } + + fmt.Println("\nSuccessfully fetched transaction from Lite Server.") + + // 5. Parse the transaction + // We use "TON_MAINNET" as network ID for display + networkID := "TON_MAINNET" + + fmt.Println("\n--- Parsing Results ---") + + // Parse Native TON Transfers + tonTransfers := indexer.ParseTonTransfer(tx, addr.String(), networkID) + if len(tonTransfers) > 0 { + fmt.Println("Native TON Transfers detected:") + for i, t := range tonTransfers { + parsedJSON, _ := json.MarshalIndent(t, "", " ") + fmt.Printf("[%d] %s\n", i, string(parsedJSON)) + } + } else { + fmt.Println("No Native TON Transfers detected.") + } + + // Parse Jetton Transfers + // Passing nil for registry means it will treat the Jetton Master address as the wallet address temporarily, + // or returns "unknown" for symbol, but parsing logic should hold. + jettonTransfers := indexer.ParseJettonTransfer(tx, addr.String(), networkID, nil) + if len(jettonTransfers) > 0 { + fmt.Println("\nJetton Transfers detected:") + for i, t := range jettonTransfers { + parsedJSON, _ := json.MarshalIndent(t, "", " ") + fmt.Printf("[%d] %s\n", i, string(parsedJSON)) + } + } else { + fmt.Println("No Jetton Transfers detected.") + } +}