From b4591968ec96d8b70f7439ad01d3b5f7a7493095 Mon Sep 17 00:00:00 2001 From: Vo Tuan Thanh Date: Sat, 31 Jan 2026 22:20:43 +0700 Subject: [PATCH 1/3] feat(aptos): Add Aptos rpc support --- configs/config.example.yaml | 26 +++++++ internal/rpc/aptos/address.go | 47 +++++++++++ internal/rpc/aptos/api.go | 14 ++++ internal/rpc/aptos/client.go | 81 +++++++++++++++++++ internal/rpc/aptos/tx.go | 143 ++++++++++++++++++++++++++++++++++ internal/rpc/aptos/types.go | 83 ++++++++++++++++++++ internal/rpc/types.go | 1 + internal/worker/factory.go | 36 +++++++++ pkg/common/config/types.go | 2 +- 9 files changed, 432 insertions(+), 1 deletion(-) create mode 100644 internal/rpc/aptos/address.go create mode 100644 internal/rpc/aptos/api.go create mode 100644 internal/rpc/aptos/client.go create mode 100644 internal/rpc/aptos/tx.go create mode 100644 internal/rpc/aptos/types.go diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 583075f..cda246e 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -160,6 +160,32 @@ chains: nodes: - url: "fullnode.mainnet.sui.io:443" # e.g. 127.0.0.1:9000 - url: "sui-mainnet.nodeinfra.com:443" + aptos_mainnet: + type: "aptos" + start_block: 596926179 + poll_interval: "1s" # override default poll interval + nodes: + - url: "https://aptos-rest.publicnode.com" + - url: "https://fullnode.mainnet.aptoslabs.com/v1" + client: + max_retries: 5 # override global max retries + retry_delay: "10s" + throttle: + rps: 5 + burst: 8 + + aptos_testnet: + type: "aptos" + start_block: 638572579 + poll_interval: "1s" # override default poll interval + nodes: + - url: "https://fullnode.testnet.aptoslabs.com/v1" + client: + max_retries: 5 # override global max retries + retry_delay: "10s" + throttle: + rps: 5 + burst: 8 # Infrastructure services services: diff --git a/internal/rpc/aptos/address.go b/internal/rpc/aptos/address.go new file mode 100644 index 0000000..719f77a --- /dev/null +++ b/internal/rpc/aptos/address.go @@ -0,0 +1,47 @@ +package aptos + +import ( + "fmt" + "strings" +) + +func NormalizeAddress(addr string) string { + addr = strings.ToLower(strings.TrimSpace(addr)) + + if !strings.HasPrefix(addr, "0x") { + addr = "0x" + addr + } + + addr = strings.TrimPrefix(addr, "0x") + + if len(addr) < 64 { + addr = strings.Repeat("0", 64-len(addr)) + addr + } + + return "0x" + addr +} + +func ValidateAddress(addr string) error { + normalized := NormalizeAddress(addr) + + if len(normalized) != 66 { + return fmt.Errorf("invalid address length: expected 66 characters (0x + 64 hex), got %d", len(normalized)) + } + + for i, c := range normalized[2:] { + if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) { + return fmt.Errorf("invalid character at position %d: %c", i+2, c) + } + } + + return nil +} + +func ShortAddress(addr string) string { + normalized := NormalizeAddress(addr) + trimmed := strings.TrimLeft(normalized[2:], "0") + if trimmed == "" { + return "0x0" + } + return "0x" + trimmed +} diff --git a/internal/rpc/aptos/api.go b/internal/rpc/aptos/api.go new file mode 100644 index 0000000..a7a6945 --- /dev/null +++ b/internal/rpc/aptos/api.go @@ -0,0 +1,14 @@ +package aptos + +import ( + "context" + + "github.com/fystack/multichain-indexer/internal/rpc" +) + +type AptosAPI interface { + rpc.NetworkClient + GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) + GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) + GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) +} diff --git a/internal/rpc/aptos/client.go b/internal/rpc/aptos/client.go new file mode 100644 index 0000000..bb9a8f3 --- /dev/null +++ b/internal/rpc/aptos/client.go @@ -0,0 +1,81 @@ +package aptos + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/pkg/ratelimiter" +) + +type Client struct { + *rpc.BaseClient +} + +func NewAptosClient( + url string, + auth *rpc.AuthConfig, + timeout time.Duration, + rateLimiter *ratelimiter.PooledRateLimiter, +) *Client { + return &Client{ + BaseClient: rpc.NewBaseClient( + url, + rpc.NetworkAptos, + rpc.ClientTypeREST, + auth, + timeout, + rateLimiter, + ), + } +} + +func (c *Client) GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) { + data, err := c.Do(ctx, http.MethodGet, "/v1", nil, nil) + if err != nil { + return nil, fmt.Errorf("getLedgerInfo failed: %w", err) + } + + var info LedgerInfo + if err := json.Unmarshal(data, &info); err != nil { + return nil, fmt.Errorf("failed to unmarshal ledger info: %w", err) + } + return &info, nil +} + +func (c *Client) GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) { + path := fmt.Sprintf("/v1/blocks/by_version/%d", version) + query := map[string]string{} + if withTransactions { + query["with_transactions"] = "true" + } + + data, err := c.Do(ctx, http.MethodGet, path, nil, query) + if err != nil { + return nil, fmt.Errorf("getBlockByVersion failed: %w", err) + } + + var block Block + if err := json.Unmarshal(data, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal block: %w", err) + } + return &block, nil +} + +func (c *Client) GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) { + path := fmt.Sprintf("/v1/transactions?start=%d&limit=%d", start, limit) + + data, err := c.Do(ctx, http.MethodGet, path, nil, nil) + if err != nil { + return nil, fmt.Errorf("getTransactionsByVersion failed: %w", err) + } + + var txs []Transaction + if err := json.Unmarshal(data, &txs); err != nil { + return nil, fmt.Errorf("failed to unmarshal transactions: %w", err) + } + return txs, nil +} diff --git a/internal/rpc/aptos/tx.go b/internal/rpc/aptos/tx.go new file mode 100644 index 0000000..d19a216 --- /dev/null +++ b/internal/rpc/aptos/tx.go @@ -0,0 +1,143 @@ +package aptos + +import ( + "fmt" + "strconv" + "strings" + + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/shopspring/decimal" +) + +const ( + APTDecimals = 8 + NativeAPTType = "0x1::aptos_coin::AptosCoin" +) + +func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) []types.Transaction { + if !tx.Success || tx.Type != TxTypeUser { + return nil + } + + var transfers []types.Transaction + fee := tx.calculateFee() + timestamp := parseTimestamp(tx.Timestamp) + + withdrawMap := make(map[string]map[string]string) + depositMap := make(map[string]map[string]string) + + for _, event := range tx.Events { + switch { + case strings.Contains(event.Type, "WithdrawEvent"): + coinType := extractCoinType(event.Type) + addr := NormalizeAddress(event.GUID.AccountAddress) + if withdrawMap[addr] == nil { + withdrawMap[addr] = make(map[string]string) + } + amount := event.Data.Amount + if existing, ok := withdrawMap[addr][coinType]; ok { + existingDec, _ := decimal.NewFromString(existing) + newDec, _ := decimal.NewFromString(amount) + withdrawMap[addr][coinType] = existingDec.Add(newDec).String() + } else { + withdrawMap[addr][coinType] = amount + } + + case strings.Contains(event.Type, "DepositEvent"): + coinType := extractCoinType(event.Type) + addr := NormalizeAddress(event.GUID.AccountAddress) + if depositMap[addr] == nil { + depositMap[addr] = make(map[string]string) + } + amount := event.Data.Amount + if existing, ok := depositMap[addr][coinType]; ok { + existingDec, _ := decimal.NewFromString(existing) + newDec, _ := decimal.NewFromString(amount) + depositMap[addr][coinType] = existingDec.Add(newDec).String() + } else { + depositMap[addr][coinType] = amount + } + } + } + + for toAddr, coins := range depositMap { + for coinType, amount := range coins { + fromAddr := tx.Sender + if fromAddr == "" { + fromAddr = findWithdrawAddress(withdrawMap, coinType, amount) + } + + transfer := types.Transaction{ + TxHash: tx.Hash, + NetworkId: networkId, + BlockNumber: blockNumber, + FromAddress: NormalizeAddress(fromAddr), + ToAddress: NormalizeAddress(toAddr), + Amount: amount, + TxFee: fee, + Timestamp: timestamp, + } + + if isNativeAPT(coinType) { + transfer.AssetAddress = "" + transfer.Type = constant.TxTypeNativeTransfer + } else { + transfer.AssetAddress = coinType + transfer.Type = constant.TxTypeTokenTransfer + } + + transfers = append(transfers, transfer) + } + } + + return transfers +} + +func (tx *Transaction) calculateFee() decimal.Decimal { + gasUsed, _ := strconv.ParseInt(tx.GasUsed, 10, 64) + gasPrice, _ := strconv.ParseInt(tx.GasUnitPrice, 10, 64) + + feeOctas := gasUsed * gasPrice + feeAPT := decimal.NewFromInt(feeOctas).Div(decimal.NewFromInt(100000000)) + + return feeAPT +} + +func parseTimestamp(ts string) uint64 { + microseconds, err := strconv.ParseUint(ts, 10, 64) + if err != nil { + return 0 + } + return microseconds / 1000000 +} + +func extractCoinType(eventType string) string { + start := strings.Index(eventType, "<") + end := strings.LastIndex(eventType, ">") + if start == -1 || end == -1 || start >= end { + return NativeAPTType + } + return strings.TrimSpace(eventType[start+1 : end]) +} + +func isNativeAPT(coinType string) bool { + return coinType == NativeAPTType || coinType == "" +} + +func findWithdrawAddress(withdrawMap map[string]map[string]string, coinType, amount string) string { + for addr, coins := range withdrawMap { + if withdrawAmount, ok := coins[coinType]; ok && withdrawAmount == amount { + return addr + } + } + return "" +} + +func ParseVersion(versionStr string) (uint64, error) { + version, err := strconv.ParseUint(versionStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid version: %w", err) + } + return version, nil +} diff --git a/internal/rpc/aptos/types.go b/internal/rpc/aptos/types.go new file mode 100644 index 0000000..cfd0b87 --- /dev/null +++ b/internal/rpc/aptos/types.go @@ -0,0 +1,83 @@ +package aptos + +import "encoding/json" + +type LedgerInfo struct { + ChainID int `json:"chain_id"` + Epoch string `json:"epoch"` + LedgerVersion string `json:"ledger_version"` + OldestLedgerVersion string `json:"oldest_ledger_version"` + LedgerTimestamp string `json:"ledger_timestamp"` + NodeRole string `json:"node_role"` + OldestBlockHeight string `json:"oldest_block_height"` + BlockHeight string `json:"block_height"` + GitHash string `json:"git_hash"` +} + +type Block struct { + BlockHeight string `json:"block_height"` + BlockHash string `json:"block_hash"` + BlockTimestamp string `json:"block_timestamp"` + FirstVersion string `json:"first_version"` + LastVersion string `json:"last_version"` + Transactions []Transaction `json:"transactions,omitempty"` +} + +type Transaction struct { + Version string `json:"version"` + Hash string `json:"hash"` + StateChangeHash string `json:"state_change_hash"` + EventRootHash string `json:"event_root_hash"` + StateCheckpointHash string `json:"state_checkpoint_hash,omitempty"` + GasUsed string `json:"gas_used"` + Success bool `json:"success"` + VMStatus string `json:"vm_status"` + AccumulatorRootHash string `json:"accumulator_root_hash"` + Changes []json.RawMessage `json:"changes"` + Sender string `json:"sender,omitempty"` + SequenceNumber string `json:"sequence_number,omitempty"` + MaxGasAmount string `json:"max_gas_amount,omitempty"` + GasUnitPrice string `json:"gas_unit_price,omitempty"` + ExpirationTimestampSecs string `json:"expiration_timestamp_secs,omitempty"` + Payload *TransactionPayload `json:"payload,omitempty"` + Events []Event `json:"events"` + Timestamp string `json:"timestamp"` + Type string `json:"type"` +} + +type TransactionPayload struct { + Type string `json:"type"` + Function string `json:"function,omitempty"` + TypeArguments []string `json:"type_arguments,omitempty"` + Arguments []interface{} `json:"arguments,omitempty"` + Code json.RawMessage `json:"code,omitempty"` +} + +type Event struct { + GUID EventGUID `json:"guid"` + SequenceNumber string `json:"sequence_number"` + Type string `json:"type"` + Data EventData `json:"data"` +} + +type EventGUID struct { + CreationNumber string `json:"creation_number"` + AccountAddress string `json:"account_address"` +} + +type EventData struct { + Amount string `json:"amount,omitempty"` +} + +const ( + EventTypeWithdraw = "0x1::coin::WithdrawEvent" + EventTypeDeposit = "0x1::coin::DepositEvent" + + PayloadTypeEntryFunction = "entry_function_payload" + PayloadTypeScript = "script_payload" + + TxTypeUser = "user_transaction" + TxTypeBlockMetadata = "block_metadata_transaction" + TxTypeStateCheckpoint = "state_checkpoint_transaction" + TxTypeGenesisTransaction = "genesis_transaction" +) diff --git a/internal/rpc/types.go b/internal/rpc/types.go index 729f5e6..01e10d7 100644 --- a/internal/rpc/types.go +++ b/internal/rpc/types.go @@ -20,6 +20,7 @@ const ( NetworkSolana = "solana" // Solana blockchain NetworkTron = "tron" // Tron blockchain NetworkBitcoin = "bitcoin" // Bitcoin blockchain + NetworkAptos = "aptos" // Aptos blockchain NetworkGeneric = "generic" // Generic/unknown blockchain type ) diff --git a/internal/worker/factory.go b/internal/worker/factory.go index dcf3608..f3987b6 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -6,6 +6,7 @@ import ( "github.com/fystack/multichain-indexer/internal/indexer" "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/aptos" "github.com/fystack/multichain-indexer/internal/rpc/bitcoin" "github.com/fystack/multichain-indexer/internal/rpc/evm" "github.com/fystack/multichain-indexer/internal/rpc/solana" @@ -263,6 +264,39 @@ func buildSolanaIndexer(chainName string, chainCfg config.ChainConfig, mode Work return indexer.NewSolanaIndexer(chainName, chainCfg, failover, pubkeyStore) } +// buildAptosIndexer constructs an Aptos indexer with failover and providers. +func buildAptosIndexer(chainName string, chainCfg config.ChainConfig, mode WorkerMode) indexer.Indexer { + failover := rpc.NewFailover[aptos.AptosAPI](nil) + + rl := ratelimiter.GetOrCreateSharedPooledRateLimiter( + chainName, chainCfg.Throttle.RPS, chainCfg.Throttle.Burst, + ) + + for i, node := range chainCfg.Nodes { + client := aptos.NewAptosClient( + node.URL, + &rpc.AuthConfig{ + Type: rpc.AuthType(node.Auth.Type), + Key: node.Auth.Key, + Value: node.Auth.Value, + }, + chainCfg.Client.Timeout, + rl, + ) + + failover.AddProvider(&rpc.Provider{ + Name: chainName + "-" + strconv.Itoa(i+1), + URL: node.URL, + Network: chainName, + ClientType: "rest", + Client: client, + State: rpc.StateHealthy, + }) + } + + return indexer.NewAptosIndexer(chainName, chainCfg, failover) +} + // buildSuiIndexer constructs a Sui indexer with failover and providers. func buildSuiIndexer( chainName string, @@ -325,6 +359,8 @@ func CreateManagerWithWorkers( idxr = buildBitcoinIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) case enum.NetworkTypeSol: idxr = buildSolanaIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) + case enum.NetworkTypeApt: + idxr = buildAptosIndexer(chainName, chainCfg, ModeRegular) case enum.NetworkTypeSui: idxr = buildSuiIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) default: diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index c5bf40a..b667406 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -37,7 +37,7 @@ type ChainConfig struct { Name string `yaml:"-"` NetworkId string `yaml:"network_id"` InternalCode string `yaml:"internal_code"` - Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm btc sol sui"` + Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm btc sol sui apt"` FromLatest bool `yaml:"from_latest"` StartBlock int `yaml:"start_block" validate:"min=0"` PollInterval time.Duration `yaml:"poll_interval"` From 7e1548f7661abdf0a0876e63e295f3f1164ad3f0 Mon Sep 17 00:00:00 2001 From: Vo Tuan Thanh Date: Sun, 1 Feb 2026 00:57:36 +0700 Subject: [PATCH 2/3] feat(aptos): Implement Aptos indexer and integrate with address bloom filters --- internal/indexer/aptos.go | 221 +++++++++++++++++++++++++++++ pkg/addressbloomfilter/inmemory.go | 1 + pkg/addressbloomfilter/redis.go | 1 + 3 files changed, 223 insertions(+) create mode 100644 internal/indexer/aptos.go diff --git a/internal/indexer/aptos.go b/internal/indexer/aptos.go new file mode 100644 index 0000000..5e5369b --- /dev/null +++ b/internal/indexer/aptos.go @@ -0,0 +1,221 @@ +package indexer + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/aptos" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/fystack/multichain-indexer/pkg/common/utils" +) + +type AptosIndexer struct { + chainName string + config config.ChainConfig + failover *rpc.Failover[aptos.AptosAPI] +} + +func NewAptosIndexer(chainName string, cfg config.ChainConfig, f *rpc.Failover[aptos.AptosAPI]) *AptosIndexer { + return &AptosIndexer{ + chainName: chainName, + config: cfg, + failover: f, + } +} + +func (a *AptosIndexer) GetName() string { return strings.ToUpper(a.chainName) } +func (a *AptosIndexer) GetNetworkType() enum.NetworkType { return enum.NetworkTypeApt } +func (a *AptosIndexer) GetNetworkInternalCode() string { return a.config.InternalCode } +func (a *AptosIndexer) GetNetworkId() string { return a.config.NetworkId } + +func (a *AptosIndexer) GetLatestBlockNumber(ctx context.Context) (uint64, error) { + var latest uint64 + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + info, err := c.GetLedgerInfo(ctx) + if err != nil { + return err + } + version, err := aptos.ParseVersion(info.BlockHeight) + if err != nil { + return err + } + latest = version + return nil + }) + return latest, err +} + +func (a *AptosIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types.Block, error) { + var aptosBlock *aptos.Block + + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + block, err := c.GetBlockByVersion(ctx, blockNumber, false) + if err != nil { + return err + } + aptosBlock = block + return nil + }) + + if err != nil { + return nil, err + } + + firstVersion, err := aptos.ParseVersion(aptosBlock.FirstVersion) + if err != nil { + return nil, fmt.Errorf("invalid first_version: %w", err) + } + + lastVersion, err := aptos.ParseVersion(aptosBlock.LastVersion) + if err != nil { + return nil, fmt.Errorf("invalid last_version: %w", err) + } + + if lastVersion < firstVersion { + return nil, fmt.Errorf("invalid version range: last_version %d < first_version %d", lastVersion, firstVersion) + } + + limit := lastVersion - firstVersion + 1 + if limit > 100 { + limit = 100 + } + + var txs []aptos.Transaction + err = a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + transactions, err := c.GetTransactionsByVersion(ctx, firstVersion, limit) + if err != nil { + return err + } + txs = transactions + return nil + }) + + if err != nil { + return nil, err + } + + return a.processBlock(aptosBlock, txs) +} + +func (a *AptosIndexer) processBlock( + aptosBlock *aptos.Block, + txs []aptos.Transaction, +) (*types.Block, error) { + blockHeight, err := aptos.ParseVersion(aptosBlock.BlockHeight) + if err != nil { + return nil, fmt.Errorf("invalid block_height: %w", err) + } + + timestamp, err := aptos.ParseVersion(aptosBlock.BlockTimestamp) + if err != nil { + return nil, fmt.Errorf("invalid block_timestamp: %w", err) + } + + block := &types.Block{ + Number: blockHeight, + Hash: aptosBlock.BlockHash, + ParentHash: "", + Timestamp: timestamp / 1000000, + Transactions: []types.Transaction{}, + } + + for _, tx := range txs { + transfers := tx.ExtractTransfers(a.GetNetworkId(), blockHeight) + block.Transactions = append(block.Transactions, transfers...) + } + + block.Transactions = utils.DedupTransfers(block.Transactions) + return block, nil +} + +func (a *AptosIndexer) GetBlocks( + ctx context.Context, + start, end uint64, + isParallel bool, +) ([]BlockResult, error) { + var nums []uint64 + for i := start; i <= end; i++ { + nums = append(nums, i) + } + return a.GetBlocksByNumbers(ctx, nums) +} + +func (a *AptosIndexer) GetBlocksByNumbers( + ctx context.Context, + nums []uint64, +) ([]BlockResult, error) { + return a.getBlocks(ctx, nums) +} + +func (a *AptosIndexer) getBlocks(ctx context.Context, nums []uint64) ([]BlockResult, error) { + if len(nums) == 0 { + return nil, nil + } + blocks := make([]BlockResult, len(nums)) + + workers := len(nums) + workers = min(workers, a.config.Throttle.Concurrency) + + type job struct { + num uint64 + index int + } + + jobs := make(chan job, workers*2) + var wg sync.WaitGroup + + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := range jobs { + blk, err := a.GetBlock(ctx, j.num) + blocks[j.index] = BlockResult{Number: j.num, Block: blk} + if err != nil { + blocks[j.index].Error = &Error{ + ErrorType: ErrorTypeUnknown, + Message: err.Error(), + } + } + } + }() + } + + go func() { + defer close(jobs) + for i, num := range nums { + select { + case <-ctx.Done(): + return + case jobs <- job{num: num, index: i}: + } + } + }() + + wg.Wait() + if ctx.Err() != nil { + return nil, ctx.Err() + } + + var firstErr error + for _, b := range blocks { + if b.Error != nil { + firstErr = fmt.Errorf("block %d: %s", b.Number, b.Error.Message) + break + } + } + return blocks, firstErr +} + +func (a *AptosIndexer) IsHealthy() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := a.GetLatestBlockNumber(ctx) + return err == nil +} diff --git a/pkg/addressbloomfilter/inmemory.go b/pkg/addressbloomfilter/inmemory.go index 507556f..414f986 100644 --- a/pkg/addressbloomfilter/inmemory.go +++ b/pkg/addressbloomfilter/inmemory.go @@ -47,6 +47,7 @@ func (abf *addressBloomFilter) Initialize(ctx context.Context) error { enum.NetworkTypeTron, enum.NetworkTypeBtc, enum.NetworkTypeSui, + enum.NetworkTypeApt, } for _, addrType := range types { diff --git a/pkg/addressbloomfilter/redis.go b/pkg/addressbloomfilter/redis.go index 32e4f3c..0bd24e3 100644 --- a/pkg/addressbloomfilter/redis.go +++ b/pkg/addressbloomfilter/redis.go @@ -73,6 +73,7 @@ func (rbf *redisBloomFilter) Initialize(ctx context.Context) error { enum.NetworkTypeBtc, enum.NetworkTypeSol, enum.NetworkTypeSui, + enum.NetworkTypeApt, } for _, addrType := range types { From 995f8904d64510023e1fed5af91b268e8d4dc155 Mon Sep 17 00:00:00 2001 From: Vo Tuan Thanh Date: Thu, 5 Feb 2026 00:59:04 +0700 Subject: [PATCH 3/3] feat(aptos): Enhance Aptos integration with new block fetching methods and configuration updates - Added support for fetching blocks by height in the Aptos API. - Updated Aptos indexer to process transactions more efficiently with optimized parallel processing. - Revised configuration for Aptos mainnet and testnet, including network IDs and polling intervals. - Improved transaction extraction logic to handle fungible asset events and store metadata. - Adjusted wallet address SQL to reflect updated Aptos type naming. --- cmd/wallet-kv-load/main.go | 1 + configs/config.example.yaml | 28 +++-- internal/indexer/aptos.go | 188 +++++++++++++++++++++++++------- internal/rpc/aptos/api.go | 1 + internal/rpc/aptos/client.go | 25 ++++- internal/rpc/aptos/tx.go | 205 ++++++++++++++++++++++++++++++----- internal/rpc/aptos/types.go | 10 ++ sql/wallet_address.sql | 2 +- 8 files changed, 377 insertions(+), 83 deletions(-) diff --git a/cmd/wallet-kv-load/main.go b/cmd/wallet-kv-load/main.go index 9a81a3a..00ab644 100644 --- a/cmd/wallet-kv-load/main.go +++ b/cmd/wallet-kv-load/main.go @@ -74,6 +74,7 @@ func (c *RunCmd) Run() error { enum.NetworkTypeTron, enum.NetworkTypeEVM, enum.NetworkTypeBtc, + enum.NetworkTypeApt, } for _, t := range candidateTypes { diff --git a/configs/config.example.yaml b/configs/config.example.yaml index cda246e..bfc17c4 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -161,31 +161,37 @@ chains: - url: "fullnode.mainnet.sui.io:443" # e.g. 127.0.0.1:9000 - url: "sui-mainnet.nodeinfra.com:443" aptos_mainnet: - type: "aptos" + network_id: "APTOS_MAINNET" + internal_code: "APTOS_MAINNET" + type: "apt" start_block: 596926179 - poll_interval: "1s" # override default poll interval + poll_interval: "3s" # Poll every 3s (blocks are ~1s, this reduces API load) + throttle: + batch_size: 25 + rps: 5 + burst: 8 nodes: - - url: "https://aptos-rest.publicnode.com" + - url: "https://aptos-rest.publicnode.com/v1" - url: "https://fullnode.mainnet.aptoslabs.com/v1" client: max_retries: 5 # override global max retries retry_delay: "10s" + + aptos_testnet: + network_id: "APTOS_TESTNET" + internal_code: "APTOS_TESTNET" + type: "apt" + start_block: 642939715 + poll_interval: "3s" # Poll every 3s (blocks are ~1s, this reduces API load) throttle: + batch_size: 25 rps: 5 burst: 8 - - aptos_testnet: - type: "aptos" - start_block: 638572579 - poll_interval: "1s" # override default poll interval nodes: - url: "https://fullnode.testnet.aptoslabs.com/v1" client: max_retries: 5 # override global max retries retry_delay: "10s" - throttle: - rps: 5 - burst: 8 # Infrastructure services services: diff --git a/internal/indexer/aptos.go b/internal/indexer/aptos.go index 5e5369b..4ab9519 100644 --- a/internal/indexer/aptos.go +++ b/internal/indexer/aptos.go @@ -41,11 +41,11 @@ func (a *AptosIndexer) GetLatestBlockNumber(ctx context.Context) (uint64, error) if err != nil { return err } - version, err := aptos.ParseVersion(info.BlockHeight) + blockHeight, err := aptos.ParseVersion(info.BlockHeight) if err != nil { return err } - latest = version + latest = blockHeight return nil }) return latest, err @@ -55,7 +55,7 @@ func (a *AptosIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types var aptosBlock *aptos.Block err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { - block, err := c.GetBlockByVersion(ctx, blockNumber, false) + block, err := c.GetBlockByHeight(ctx, blockNumber, false) if err != nil { return err } @@ -67,6 +67,11 @@ func (a *AptosIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types return nil, err } + return a.processBlockWithTransactions(ctx, aptosBlock) +} + +// processBlockWithTransactions fetches and processes transactions for a block +func (a *AptosIndexer) processBlockWithTransactions(ctx context.Context, aptosBlock *aptos.Block) (*types.Block, error) { firstVersion, err := aptos.ParseVersion(aptosBlock.FirstVersion) if err != nil { return nil, fmt.Errorf("invalid first_version: %w", err) @@ -81,26 +86,54 @@ func (a *AptosIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types return nil, fmt.Errorf("invalid version range: last_version %d < first_version %d", lastVersion, firstVersion) } - limit := lastVersion - firstVersion + 1 - if limit > 100 { - limit = 100 - } + totalTxs := lastVersion - firstVersion + 1 + txBatchSize := uint64(100) + + var allTxs []aptos.Transaction - var txs []aptos.Transaction - err = a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { - transactions, err := c.GetTransactionsByVersion(ctx, firstVersion, limit) + if totalTxs <= txBatchSize { + var txs []aptos.Transaction + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + batch, err := c.GetTransactionsByVersion(ctx, firstVersion, totalTxs) + if err != nil { + return err + } + txs = batch + return nil + }) if err != nil { - return err + return nil, fmt.Errorf("failed to fetch transactions: %w", err) } - txs = transactions - return nil - }) + allTxs = txs + } else { + currentVersion := firstVersion + for currentVersion <= lastVersion { + limit := txBatchSize + remaining := lastVersion - currentVersion + 1 + if remaining < limit { + limit = remaining + } - if err != nil { - return nil, err + var txs []aptos.Transaction + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + batch, err := c.GetTransactionsByVersion(ctx, currentVersion, limit) + if err != nil { + return err + } + txs = batch + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to fetch transactions at version %d: %w", currentVersion, err) + } + + allTxs = append(allTxs, txs...) + currentVersion += limit + } } - return a.processBlock(aptosBlock, txs) + return a.processBlock(aptosBlock, allTxs) } func (a *AptosIndexer) processBlock( @@ -150,67 +183,140 @@ func (a *AptosIndexer) GetBlocksByNumbers( ctx context.Context, nums []uint64, ) ([]BlockResult, error) { - return a.getBlocks(ctx, nums) + return a.getBlocksOptimized(ctx, nums) } -func (a *AptosIndexer) getBlocks(ctx context.Context, nums []uint64) ([]BlockResult, error) { +// getBlocksOptimized fetches blocks with optimized parallel processing: +// 1. Fetches all block metadata in parallel (small, fast) +// 2. Processes blocks with transactions in parallel +func (a *AptosIndexer) getBlocksOptimized(ctx context.Context, nums []uint64) ([]BlockResult, error) { if len(nums) == 0 { return nil, nil } - blocks := make([]BlockResult, len(nums)) - workers := len(nums) - workers = min(workers, a.config.Throttle.Concurrency) + results := make([]BlockResult, len(nums)) + + aptosBlocks := make([]*aptos.Block, len(nums)) - type job struct { + type metaJob struct { num uint64 index int } - jobs := make(chan job, workers*2) - var wg sync.WaitGroup + metaWorkers := min(len(nums), a.config.Throttle.Concurrency*2) + metaJobs := make(chan metaJob, metaWorkers*2) + var metaWg sync.WaitGroup - for i := 0; i < workers; i++ { - wg.Add(1) + for i := 0; i < metaWorkers; i++ { + metaWg.Add(1) go func() { - defer wg.Done() - for j := range jobs { - blk, err := a.GetBlock(ctx, j.num) - blocks[j.index] = BlockResult{Number: j.num, Block: blk} + defer metaWg.Done() + for j := range metaJobs { + var block *aptos.Block + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + b, err := c.GetBlockByHeight(ctx, j.num, false) + if err != nil { + return err + } + block = b + return nil + }) + if err != nil { - blocks[j.index].Error = &Error{ - ErrorType: ErrorTypeUnknown, - Message: err.Error(), + results[j.index] = BlockResult{ + Number: j.num, + Error: &Error{ErrorType: ErrorTypeUnknown, Message: err.Error()}, } + } else { + aptosBlocks[j.index] = block } } }() } go func() { - defer close(jobs) + defer close(metaJobs) for i, num := range nums { select { case <-ctx.Done(): return - case jobs <- job{num: num, index: i}: + case metaJobs <- metaJob{num: num, index: i}: } } }() - wg.Wait() + metaWg.Wait() + if ctx.Err() != nil { return nil, ctx.Err() } var firstErr error - for _, b := range blocks { - if b.Error != nil { - firstErr = fmt.Errorf("block %d: %s", b.Number, b.Error.Message) + for _, r := range results { + if r.Error != nil { + firstErr = fmt.Errorf("block %d: %s", r.Number, r.Error.Message) + return results, firstErr + } + } + + type txJob struct { + aptosBlock *aptos.Block + index int + } + + txWorkers := min(len(nums), a.config.Throttle.Concurrency) + txJobs := make(chan txJob, txWorkers*2) + var txWg sync.WaitGroup + + for i := 0; i < txWorkers; i++ { + txWg.Add(1) + go func() { + defer txWg.Done() + for j := range txJobs { + blk, err := a.processBlockWithTransactions(ctx, j.aptosBlock) + blockHeight, _ := aptos.ParseVersion(j.aptosBlock.BlockHeight) + results[j.index] = BlockResult{ + Number: blockHeight, + Block: blk, + } + if err != nil { + results[j.index].Error = &Error{ + ErrorType: ErrorTypeUnknown, + Message: err.Error(), + } + } + } + }() + } + + go func() { + defer close(txJobs) + for i, block := range aptosBlocks { + if block == nil { + continue + } + select { + case <-ctx.Done(): + return + case txJobs <- txJob{aptosBlock: block, index: i}: + } + } + }() + + txWg.Wait() + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + for _, r := range results { + if r.Error != nil { + firstErr = fmt.Errorf("block %d: %s", r.Number, r.Error.Message) break } } - return blocks, firstErr + + return results, firstErr } func (a *AptosIndexer) IsHealthy() bool { diff --git a/internal/rpc/aptos/api.go b/internal/rpc/aptos/api.go index a7a6945..c93b6cc 100644 --- a/internal/rpc/aptos/api.go +++ b/internal/rpc/aptos/api.go @@ -10,5 +10,6 @@ type AptosAPI interface { rpc.NetworkClient GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) + GetBlockByHeight(ctx context.Context, height uint64, withTransactions bool) (*Block, error) GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) } diff --git a/internal/rpc/aptos/client.go b/internal/rpc/aptos/client.go index bb9a8f3..1847955 100644 --- a/internal/rpc/aptos/client.go +++ b/internal/rpc/aptos/client.go @@ -34,7 +34,7 @@ func NewAptosClient( } func (c *Client) GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) { - data, err := c.Do(ctx, http.MethodGet, "/v1", nil, nil) + data, err := c.Do(ctx, http.MethodGet, "/", nil, nil) if err != nil { return nil, fmt.Errorf("getLedgerInfo failed: %w", err) } @@ -47,7 +47,7 @@ func (c *Client) GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) { } func (c *Client) GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) { - path := fmt.Sprintf("/v1/blocks/by_version/%d", version) + path := fmt.Sprintf("/blocks/by_version/%d", version) query := map[string]string{} if withTransactions { query["with_transactions"] = "true" @@ -65,8 +65,27 @@ func (c *Client) GetBlockByVersion(ctx context.Context, version uint64, withTran return &block, nil } +func (c *Client) GetBlockByHeight(ctx context.Context, height uint64, withTransactions bool) (*Block, error) { + path := fmt.Sprintf("/blocks/by_height/%d", height) + query := map[string]string{} + if withTransactions { + query["with_transactions"] = "true" + } + + data, err := c.Do(ctx, http.MethodGet, path, nil, query) + if err != nil { + return nil, fmt.Errorf("getBlockByHeight failed: %w", err) + } + + var block Block + if err := json.Unmarshal(data, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal block: %w", err) + } + return &block, nil +} + func (c *Client) GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) { - path := fmt.Sprintf("/v1/transactions?start=%d&limit=%d", start, limit) + path := fmt.Sprintf("/transactions?start=%d&limit=%d", start, limit) data, err := c.Do(ctx, http.MethodGet, path, nil, nil) if err != nil { diff --git a/internal/rpc/aptos/tx.go b/internal/rpc/aptos/tx.go index d19a216..49851ae 100644 --- a/internal/rpc/aptos/tx.go +++ b/internal/rpc/aptos/tx.go @@ -1,6 +1,7 @@ package aptos import ( + "encoding/json" "fmt" "strconv" "strings" @@ -13,6 +14,8 @@ import ( const ( APTDecimals = 8 NativeAPTType = "0x1::aptos_coin::AptosCoin" + // NativeAptosFA is the object address for native APT as a Fungible Asset (FA standard) + NativeAptosFA = "0x000000000000000000000000000000000000000000000000000000000000000a" ) func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) []types.Transaction { @@ -24,48 +27,56 @@ func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) [] fee := tx.calculateFee() timestamp := parseTimestamp(tx.Timestamp) + storeOwners := tx.buildStoreOwnerMap() + storeMetadata := tx.buildStoreMetadataMap() + + // Key: address -> assetType -> amount withdrawMap := make(map[string]map[string]string) depositMap := make(map[string]map[string]string) for _, event := range tx.Events { switch { - case strings.Contains(event.Type, "WithdrawEvent"): + // Legacy Coin events + case strings.Contains(event.Type, "::coin::WithdrawEvent"): coinType := extractCoinType(event.Type) addr := NormalizeAddress(event.GUID.AccountAddress) - if withdrawMap[addr] == nil { - withdrawMap[addr] = make(map[string]string) - } amount := event.Data.Amount - if existing, ok := withdrawMap[addr][coinType]; ok { - existingDec, _ := decimal.NewFromString(existing) - newDec, _ := decimal.NewFromString(amount) - withdrawMap[addr][coinType] = existingDec.Add(newDec).String() - } else { - withdrawMap[addr][coinType] = amount - } + addToMap(withdrawMap, addr, coinType, amount) - case strings.Contains(event.Type, "DepositEvent"): + case strings.Contains(event.Type, "::coin::DepositEvent"): coinType := extractCoinType(event.Type) addr := NormalizeAddress(event.GUID.AccountAddress) - if depositMap[addr] == nil { - depositMap[addr] = make(map[string]string) - } amount := event.Data.Amount - if existing, ok := depositMap[addr][coinType]; ok { - existingDec, _ := decimal.NewFromString(existing) - newDec, _ := decimal.NewFromString(amount) - depositMap[addr][coinType] = existingDec.Add(newDec).String() - } else { - depositMap[addr][coinType] = amount + addToMap(depositMap, addr, coinType, amount) + + // FA Events + case strings.Contains(event.Type, "::fungible_asset::Withdraw"): + storeAddr := NormalizeAddress(event.Data.Store) + ownerAddr := storeOwners[storeAddr] + if ownerAddr == "" { + ownerAddr = tx.Sender } + faType := extractFATypeFromMetadata(storeMetadata[storeAddr]) + amount := extractFAAmount(event.Data) + addToMap(withdrawMap, ownerAddr, faType, amount) + + case strings.Contains(event.Type, "::fungible_asset::Deposit"): + storeAddr := NormalizeAddress(event.Data.Store) + ownerAddr := storeOwners[storeAddr] + if ownerAddr == "" { + ownerAddr = tx.Sender + } + faType := extractFATypeFromMetadata(storeMetadata[storeAddr]) + amount := extractFAAmount(event.Data) + addToMap(depositMap, ownerAddr, faType, amount) } } - for toAddr, coins := range depositMap { - for coinType, amount := range coins { + for toAddr, assets := range depositMap { + for assetType, amount := range assets { fromAddr := tx.Sender if fromAddr == "" { - fromAddr = findWithdrawAddress(withdrawMap, coinType, amount) + fromAddr = findWithdrawAddress(withdrawMap, assetType, amount) } transfer := types.Transaction{ @@ -79,11 +90,11 @@ func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) [] Timestamp: timestamp, } - if isNativeAPT(coinType) { + if isNativeAPT(assetType) || isNativeAptosFA(assetType) { transfer.AssetAddress = "" transfer.Type = constant.TxTypeNativeTransfer } else { - transfer.AssetAddress = coinType + transfer.AssetAddress = assetType transfer.Type = constant.TxTypeTokenTransfer } @@ -94,6 +105,140 @@ func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) [] return transfers } +// buildStoreOwnerMap builds a map of store addresses to owner addresses +func (tx *Transaction) buildStoreOwnerMap() map[string]string { + storeOwners := make(map[string]string) + + for _, change := range tx.Changes { + var changeMap map[string]interface{} + if err := json.Unmarshal(change, &changeMap); err != nil { + continue + } + + changeType, _ := changeMap["type"].(string) + if changeType != "write_resource" { + continue + } + + data, ok := changeMap["data"].(map[string]interface{}) + if !ok { + continue + } + + dataType, _ := data["type"].(string) + if dataType != "0x1::object::ObjectCore" { + continue + } + + storeAddr, _ := changeMap["address"].(string) + if storeAddr == "" { + continue + } + + innerData, ok := data["data"].(map[string]interface{}) + if !ok { + continue + } + + owner, _ := innerData["owner"].(string) + if owner != "" { + storeOwners[NormalizeAddress(storeAddr)] = NormalizeAddress(owner) + } + } + + return storeOwners +} + +// buildStoreMetadataMap builds a map of store addresses to metadata addresses +func (tx *Transaction) buildStoreMetadataMap() map[string]string { + storeMetadata := make(map[string]string) + + for _, change := range tx.Changes { + var changeMap map[string]interface{} + if err := json.Unmarshal(change, &changeMap); err != nil { + continue + } + + changeType, _ := changeMap["type"].(string) + if changeType != "write_resource" { + continue + } + + data, ok := changeMap["data"].(map[string]interface{}) + if !ok { + continue + } + + dataType, _ := data["type"].(string) + if dataType != "0x1::fungible_asset::FungibleStore" { + continue + } + + storeAddr, _ := changeMap["address"].(string) + if storeAddr == "" { + continue + } + + innerData, ok := data["data"].(map[string]interface{}) + if !ok { + continue + } + + metadata, ok := innerData["metadata"].(map[string]interface{}) + if !ok { + continue + } + + metadataInner, _ := metadata["inner"].(string) + if metadataInner != "" { + storeMetadata[NormalizeAddress(storeAddr)] = NormalizeAddress(metadataInner) + } + } + + return storeMetadata +} + +func addToMap(m map[string]map[string]string, addr, assetType, amount string) { + if m[addr] == nil { + m[addr] = make(map[string]string) + } + if existing, ok := m[addr][assetType]; ok { + existingDec, _ := decimal.NewFromString(existing) + newDec, _ := decimal.NewFromString(amount) + m[addr][assetType] = existingDec.Add(newDec).String() + } else { + m[addr][assetType] = amount + } +} + +// extractFATypeFromMetadata extracts the FA type from the store metadata address +func extractFATypeFromMetadata(metadata string) string { + if metadata == "" { + return NativeAptosFA + } + normalized := NormalizeAddress(metadata) + if normalized == NativeAptosFA || normalized == "0x000000000000000000000000000000000000000000000000000000000000000a" { + return NativeAPTType // Return legacy type for native APT + } + return normalized +} + +// extractFAAmount extracts amount from FA event data +// FA events may have different data structures than Coin events +func extractFAAmount(data EventData) string { + if data.Amount != "" { + return data.Amount + } + return "0" +} + +func isNativeAptosFA(assetType string) bool { + // Native APT as FA has metadata at 0xa + return assetType == NativeAptosFA || + assetType == "0x0a" || + strings.HasSuffix(assetType, "::aptos_coin::AptosCoin") +} + func (tx *Transaction) calculateFee() decimal.Decimal { gasUsed, _ := strconv.ParseInt(tx.GasUsed, 10, 64) gasPrice, _ := strconv.ParseInt(tx.GasUnitPrice, 10, 64) @@ -122,7 +267,13 @@ func extractCoinType(eventType string) string { } func isNativeAPT(coinType string) bool { - return coinType == NativeAPTType || coinType == "" + // Native APT can be represented as: + // 1. Legacy: "0x1::aptos_coin::AptosCoin" + // 2. FA: "0x0000...000a" (the metadata address) + return coinType == NativeAPTType || + coinType == NativeAptosFA || + coinType == "0x000000000000000000000000000000000000000000000000000000000000000a" || + coinType == "" } func findWithdrawAddress(withdrawMap map[string]map[string]string, coinType, amount string) string { diff --git a/internal/rpc/aptos/types.go b/internal/rpc/aptos/types.go index cfd0b87..97db9e5 100644 --- a/internal/rpc/aptos/types.go +++ b/internal/rpc/aptos/types.go @@ -67,12 +67,22 @@ type EventGUID struct { type EventData struct { Amount string `json:"amount,omitempty"` + Store string `json:"store,omitempty"` +} + +type ObjectCore struct { + AllowUngatedTransfer bool `json:"allow_ungated_transfer"` + GUIDCreationNum string `json:"guid_creation_num"` + Owner string `json:"owner"` } const ( EventTypeWithdraw = "0x1::coin::WithdrawEvent" EventTypeDeposit = "0x1::coin::DepositEvent" + EventTypeFAWithdraw = "0x1::fungible_asset::Withdraw" + EventTypeFADeposit = "0x1::fungible_asset::Deposit" + PayloadTypeEntryFunction = "entry_function_payload" PayloadTypeScript = "script_payload" diff --git a/sql/wallet_address.sql b/sql/wallet_address.sql index 93bb13e..376fa07 100644 --- a/sql/wallet_address.sql +++ b/sql/wallet_address.sql @@ -23,7 +23,7 @@ DO $$ BEGIN 'evm', 'btc', 'sol', - 'aptos', + 'apt', 'tron' ); EXCEPTION