diff --git a/cmd/wallet-kv-load/main.go b/cmd/wallet-kv-load/main.go index 9a81a3a..00ab644 100644 --- a/cmd/wallet-kv-load/main.go +++ b/cmd/wallet-kv-load/main.go @@ -74,6 +74,7 @@ func (c *RunCmd) Run() error { enum.NetworkTypeTron, enum.NetworkTypeEVM, enum.NetworkTypeBtc, + enum.NetworkTypeApt, } for _, t := range candidateTypes { diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 583075f..bfc17c4 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -160,6 +160,38 @@ chains: nodes: - url: "fullnode.mainnet.sui.io:443" # e.g. 127.0.0.1:9000 - url: "sui-mainnet.nodeinfra.com:443" + aptos_mainnet: + network_id: "APTOS_MAINNET" + internal_code: "APTOS_MAINNET" + type: "apt" + start_block: 596926179 + poll_interval: "3s" # Poll every 3s (blocks are ~1s, this reduces API load) + throttle: + batch_size: 25 + rps: 5 + burst: 8 + nodes: + - url: "https://aptos-rest.publicnode.com/v1" + - url: "https://fullnode.mainnet.aptoslabs.com/v1" + client: + max_retries: 5 # override global max retries + retry_delay: "10s" + + aptos_testnet: + network_id: "APTOS_TESTNET" + internal_code: "APTOS_TESTNET" + type: "apt" + start_block: 642939715 + poll_interval: "3s" # Poll every 3s (blocks are ~1s, this reduces API load) + throttle: + batch_size: 25 + rps: 5 + burst: 8 + nodes: + - url: "https://fullnode.testnet.aptoslabs.com/v1" + client: + max_retries: 5 # override global max retries + retry_delay: "10s" # Infrastructure services services: diff --git a/internal/indexer/aptos.go b/internal/indexer/aptos.go new file mode 100644 index 0000000..4ab9519 --- /dev/null +++ b/internal/indexer/aptos.go @@ -0,0 +1,327 @@ +package indexer + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/aptos" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/fystack/multichain-indexer/pkg/common/utils" +) + +type AptosIndexer struct { + chainName string + config config.ChainConfig + failover *rpc.Failover[aptos.AptosAPI] +} + +func NewAptosIndexer(chainName string, cfg config.ChainConfig, f *rpc.Failover[aptos.AptosAPI]) *AptosIndexer { + return &AptosIndexer{ + chainName: chainName, + config: cfg, + failover: f, + } +} + +func (a *AptosIndexer) GetName() string { return strings.ToUpper(a.chainName) } +func (a *AptosIndexer) GetNetworkType() enum.NetworkType { return enum.NetworkTypeApt } +func (a *AptosIndexer) GetNetworkInternalCode() string { return a.config.InternalCode } +func (a *AptosIndexer) GetNetworkId() string { return a.config.NetworkId } + +func (a *AptosIndexer) GetLatestBlockNumber(ctx context.Context) (uint64, error) { + var latest uint64 + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + info, err := c.GetLedgerInfo(ctx) + if err != nil { + return err + } + blockHeight, err := aptos.ParseVersion(info.BlockHeight) + if err != nil { + return err + } + latest = blockHeight + return nil + }) + return latest, err +} + +func (a *AptosIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types.Block, error) { + var aptosBlock *aptos.Block + + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + block, err := c.GetBlockByHeight(ctx, blockNumber, false) + if err != nil { + return err + } + aptosBlock = block + return nil + }) + + if err != nil { + return nil, err + } + + return a.processBlockWithTransactions(ctx, aptosBlock) +} + +// processBlockWithTransactions fetches and processes transactions for a block +func (a *AptosIndexer) processBlockWithTransactions(ctx context.Context, aptosBlock *aptos.Block) (*types.Block, error) { + firstVersion, err := aptos.ParseVersion(aptosBlock.FirstVersion) + if err != nil { + return nil, fmt.Errorf("invalid first_version: %w", err) + } + + lastVersion, err := aptos.ParseVersion(aptosBlock.LastVersion) + if err != nil { + return nil, fmt.Errorf("invalid last_version: %w", err) + } + + if lastVersion < firstVersion { + return nil, fmt.Errorf("invalid version range: last_version %d < first_version %d", lastVersion, firstVersion) + } + + totalTxs := lastVersion - firstVersion + 1 + txBatchSize := uint64(100) + + var allTxs []aptos.Transaction + + if totalTxs <= txBatchSize { + var txs []aptos.Transaction + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + batch, err := c.GetTransactionsByVersion(ctx, firstVersion, totalTxs) + if err != nil { + return err + } + txs = batch + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to fetch transactions: %w", err) + } + allTxs = txs + } else { + currentVersion := firstVersion + for currentVersion <= lastVersion { + limit := txBatchSize + remaining := lastVersion - currentVersion + 1 + if remaining < limit { + limit = remaining + } + + var txs []aptos.Transaction + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + batch, err := c.GetTransactionsByVersion(ctx, currentVersion, limit) + if err != nil { + return err + } + txs = batch + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to fetch transactions at version %d: %w", currentVersion, err) + } + + allTxs = append(allTxs, txs...) + currentVersion += limit + } + } + + return a.processBlock(aptosBlock, allTxs) +} + +func (a *AptosIndexer) processBlock( + aptosBlock *aptos.Block, + txs []aptos.Transaction, +) (*types.Block, error) { + blockHeight, err := aptos.ParseVersion(aptosBlock.BlockHeight) + if err != nil { + return nil, fmt.Errorf("invalid block_height: %w", err) + } + + timestamp, err := aptos.ParseVersion(aptosBlock.BlockTimestamp) + if err != nil { + return nil, fmt.Errorf("invalid block_timestamp: %w", err) + } + + block := &types.Block{ + Number: blockHeight, + Hash: aptosBlock.BlockHash, + ParentHash: "", + Timestamp: timestamp / 1000000, + Transactions: []types.Transaction{}, + } + + for _, tx := range txs { + transfers := tx.ExtractTransfers(a.GetNetworkId(), blockHeight) + block.Transactions = append(block.Transactions, transfers...) + } + + block.Transactions = utils.DedupTransfers(block.Transactions) + return block, nil +} + +func (a *AptosIndexer) GetBlocks( + ctx context.Context, + start, end uint64, + isParallel bool, +) ([]BlockResult, error) { + var nums []uint64 + for i := start; i <= end; i++ { + nums = append(nums, i) + } + return a.GetBlocksByNumbers(ctx, nums) +} + +func (a *AptosIndexer) GetBlocksByNumbers( + ctx context.Context, + nums []uint64, +) ([]BlockResult, error) { + return a.getBlocksOptimized(ctx, nums) +} + +// getBlocksOptimized fetches blocks with optimized parallel processing: +// 1. Fetches all block metadata in parallel (small, fast) +// 2. Processes blocks with transactions in parallel +func (a *AptosIndexer) getBlocksOptimized(ctx context.Context, nums []uint64) ([]BlockResult, error) { + if len(nums) == 0 { + return nil, nil + } + + results := make([]BlockResult, len(nums)) + + aptosBlocks := make([]*aptos.Block, len(nums)) + + type metaJob struct { + num uint64 + index int + } + + metaWorkers := min(len(nums), a.config.Throttle.Concurrency*2) + metaJobs := make(chan metaJob, metaWorkers*2) + var metaWg sync.WaitGroup + + for i := 0; i < metaWorkers; i++ { + metaWg.Add(1) + go func() { + defer metaWg.Done() + for j := range metaJobs { + var block *aptos.Block + err := a.failover.ExecuteWithRetry(ctx, func(c aptos.AptosAPI) error { + b, err := c.GetBlockByHeight(ctx, j.num, false) + if err != nil { + return err + } + block = b + return nil + }) + + if err != nil { + results[j.index] = BlockResult{ + Number: j.num, + Error: &Error{ErrorType: ErrorTypeUnknown, Message: err.Error()}, + } + } else { + aptosBlocks[j.index] = block + } + } + }() + } + + go func() { + defer close(metaJobs) + for i, num := range nums { + select { + case <-ctx.Done(): + return + case metaJobs <- metaJob{num: num, index: i}: + } + } + }() + + metaWg.Wait() + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + var firstErr error + for _, r := range results { + if r.Error != nil { + firstErr = fmt.Errorf("block %d: %s", r.Number, r.Error.Message) + return results, firstErr + } + } + + type txJob struct { + aptosBlock *aptos.Block + index int + } + + txWorkers := min(len(nums), a.config.Throttle.Concurrency) + txJobs := make(chan txJob, txWorkers*2) + var txWg sync.WaitGroup + + for i := 0; i < txWorkers; i++ { + txWg.Add(1) + go func() { + defer txWg.Done() + for j := range txJobs { + blk, err := a.processBlockWithTransactions(ctx, j.aptosBlock) + blockHeight, _ := aptos.ParseVersion(j.aptosBlock.BlockHeight) + results[j.index] = BlockResult{ + Number: blockHeight, + Block: blk, + } + if err != nil { + results[j.index].Error = &Error{ + ErrorType: ErrorTypeUnknown, + Message: err.Error(), + } + } + } + }() + } + + go func() { + defer close(txJobs) + for i, block := range aptosBlocks { + if block == nil { + continue + } + select { + case <-ctx.Done(): + return + case txJobs <- txJob{aptosBlock: block, index: i}: + } + } + }() + + txWg.Wait() + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + for _, r := range results { + if r.Error != nil { + firstErr = fmt.Errorf("block %d: %s", r.Number, r.Error.Message) + break + } + } + + return results, firstErr +} + +func (a *AptosIndexer) IsHealthy() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := a.GetLatestBlockNumber(ctx) + return err == nil +} diff --git a/internal/rpc/aptos/address.go b/internal/rpc/aptos/address.go new file mode 100644 index 0000000..719f77a --- /dev/null +++ b/internal/rpc/aptos/address.go @@ -0,0 +1,47 @@ +package aptos + +import ( + "fmt" + "strings" +) + +func NormalizeAddress(addr string) string { + addr = strings.ToLower(strings.TrimSpace(addr)) + + if !strings.HasPrefix(addr, "0x") { + addr = "0x" + addr + } + + addr = strings.TrimPrefix(addr, "0x") + + if len(addr) < 64 { + addr = strings.Repeat("0", 64-len(addr)) + addr + } + + return "0x" + addr +} + +func ValidateAddress(addr string) error { + normalized := NormalizeAddress(addr) + + if len(normalized) != 66 { + return fmt.Errorf("invalid address length: expected 66 characters (0x + 64 hex), got %d", len(normalized)) + } + + for i, c := range normalized[2:] { + if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) { + return fmt.Errorf("invalid character at position %d: %c", i+2, c) + } + } + + return nil +} + +func ShortAddress(addr string) string { + normalized := NormalizeAddress(addr) + trimmed := strings.TrimLeft(normalized[2:], "0") + if trimmed == "" { + return "0x0" + } + return "0x" + trimmed +} diff --git a/internal/rpc/aptos/api.go b/internal/rpc/aptos/api.go new file mode 100644 index 0000000..c93b6cc --- /dev/null +++ b/internal/rpc/aptos/api.go @@ -0,0 +1,15 @@ +package aptos + +import ( + "context" + + "github.com/fystack/multichain-indexer/internal/rpc" +) + +type AptosAPI interface { + rpc.NetworkClient + GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) + GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) + GetBlockByHeight(ctx context.Context, height uint64, withTransactions bool) (*Block, error) + GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) +} diff --git a/internal/rpc/aptos/client.go b/internal/rpc/aptos/client.go new file mode 100644 index 0000000..1847955 --- /dev/null +++ b/internal/rpc/aptos/client.go @@ -0,0 +1,100 @@ +package aptos + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/pkg/ratelimiter" +) + +type Client struct { + *rpc.BaseClient +} + +func NewAptosClient( + url string, + auth *rpc.AuthConfig, + timeout time.Duration, + rateLimiter *ratelimiter.PooledRateLimiter, +) *Client { + return &Client{ + BaseClient: rpc.NewBaseClient( + url, + rpc.NetworkAptos, + rpc.ClientTypeREST, + auth, + timeout, + rateLimiter, + ), + } +} + +func (c *Client) GetLedgerInfo(ctx context.Context) (*LedgerInfo, error) { + data, err := c.Do(ctx, http.MethodGet, "/", nil, nil) + if err != nil { + return nil, fmt.Errorf("getLedgerInfo failed: %w", err) + } + + var info LedgerInfo + if err := json.Unmarshal(data, &info); err != nil { + return nil, fmt.Errorf("failed to unmarshal ledger info: %w", err) + } + return &info, nil +} + +func (c *Client) GetBlockByVersion(ctx context.Context, version uint64, withTransactions bool) (*Block, error) { + path := fmt.Sprintf("/blocks/by_version/%d", version) + query := map[string]string{} + if withTransactions { + query["with_transactions"] = "true" + } + + data, err := c.Do(ctx, http.MethodGet, path, nil, query) + if err != nil { + return nil, fmt.Errorf("getBlockByVersion failed: %w", err) + } + + var block Block + if err := json.Unmarshal(data, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal block: %w", err) + } + return &block, nil +} + +func (c *Client) GetBlockByHeight(ctx context.Context, height uint64, withTransactions bool) (*Block, error) { + path := fmt.Sprintf("/blocks/by_height/%d", height) + query := map[string]string{} + if withTransactions { + query["with_transactions"] = "true" + } + + data, err := c.Do(ctx, http.MethodGet, path, nil, query) + if err != nil { + return nil, fmt.Errorf("getBlockByHeight failed: %w", err) + } + + var block Block + if err := json.Unmarshal(data, &block); err != nil { + return nil, fmt.Errorf("failed to unmarshal block: %w", err) + } + return &block, nil +} + +func (c *Client) GetTransactionsByVersion(ctx context.Context, start, limit uint64) ([]Transaction, error) { + path := fmt.Sprintf("/transactions?start=%d&limit=%d", start, limit) + + data, err := c.Do(ctx, http.MethodGet, path, nil, nil) + if err != nil { + return nil, fmt.Errorf("getTransactionsByVersion failed: %w", err) + } + + var txs []Transaction + if err := json.Unmarshal(data, &txs); err != nil { + return nil, fmt.Errorf("failed to unmarshal transactions: %w", err) + } + return txs, nil +} diff --git a/internal/rpc/aptos/tx.go b/internal/rpc/aptos/tx.go new file mode 100644 index 0000000..49851ae --- /dev/null +++ b/internal/rpc/aptos/tx.go @@ -0,0 +1,294 @@ +package aptos + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/shopspring/decimal" +) + +const ( + APTDecimals = 8 + NativeAPTType = "0x1::aptos_coin::AptosCoin" + // NativeAptosFA is the object address for native APT as a Fungible Asset (FA standard) + NativeAptosFA = "0x000000000000000000000000000000000000000000000000000000000000000a" +) + +func (tx *Transaction) ExtractTransfers(networkId string, blockNumber uint64) []types.Transaction { + if !tx.Success || tx.Type != TxTypeUser { + return nil + } + + var transfers []types.Transaction + fee := tx.calculateFee() + timestamp := parseTimestamp(tx.Timestamp) + + storeOwners := tx.buildStoreOwnerMap() + storeMetadata := tx.buildStoreMetadataMap() + + // Key: address -> assetType -> amount + withdrawMap := make(map[string]map[string]string) + depositMap := make(map[string]map[string]string) + + for _, event := range tx.Events { + switch { + // Legacy Coin events + case strings.Contains(event.Type, "::coin::WithdrawEvent"): + coinType := extractCoinType(event.Type) + addr := NormalizeAddress(event.GUID.AccountAddress) + amount := event.Data.Amount + addToMap(withdrawMap, addr, coinType, amount) + + case strings.Contains(event.Type, "::coin::DepositEvent"): + coinType := extractCoinType(event.Type) + addr := NormalizeAddress(event.GUID.AccountAddress) + amount := event.Data.Amount + addToMap(depositMap, addr, coinType, amount) + + // FA Events + case strings.Contains(event.Type, "::fungible_asset::Withdraw"): + storeAddr := NormalizeAddress(event.Data.Store) + ownerAddr := storeOwners[storeAddr] + if ownerAddr == "" { + ownerAddr = tx.Sender + } + faType := extractFATypeFromMetadata(storeMetadata[storeAddr]) + amount := extractFAAmount(event.Data) + addToMap(withdrawMap, ownerAddr, faType, amount) + + case strings.Contains(event.Type, "::fungible_asset::Deposit"): + storeAddr := NormalizeAddress(event.Data.Store) + ownerAddr := storeOwners[storeAddr] + if ownerAddr == "" { + ownerAddr = tx.Sender + } + faType := extractFATypeFromMetadata(storeMetadata[storeAddr]) + amount := extractFAAmount(event.Data) + addToMap(depositMap, ownerAddr, faType, amount) + } + } + + for toAddr, assets := range depositMap { + for assetType, amount := range assets { + fromAddr := tx.Sender + if fromAddr == "" { + fromAddr = findWithdrawAddress(withdrawMap, assetType, amount) + } + + transfer := types.Transaction{ + TxHash: tx.Hash, + NetworkId: networkId, + BlockNumber: blockNumber, + FromAddress: NormalizeAddress(fromAddr), + ToAddress: NormalizeAddress(toAddr), + Amount: amount, + TxFee: fee, + Timestamp: timestamp, + } + + if isNativeAPT(assetType) || isNativeAptosFA(assetType) { + transfer.AssetAddress = "" + transfer.Type = constant.TxTypeNativeTransfer + } else { + transfer.AssetAddress = assetType + transfer.Type = constant.TxTypeTokenTransfer + } + + transfers = append(transfers, transfer) + } + } + + return transfers +} + +// buildStoreOwnerMap builds a map of store addresses to owner addresses +func (tx *Transaction) buildStoreOwnerMap() map[string]string { + storeOwners := make(map[string]string) + + for _, change := range tx.Changes { + var changeMap map[string]interface{} + if err := json.Unmarshal(change, &changeMap); err != nil { + continue + } + + changeType, _ := changeMap["type"].(string) + if changeType != "write_resource" { + continue + } + + data, ok := changeMap["data"].(map[string]interface{}) + if !ok { + continue + } + + dataType, _ := data["type"].(string) + if dataType != "0x1::object::ObjectCore" { + continue + } + + storeAddr, _ := changeMap["address"].(string) + if storeAddr == "" { + continue + } + + innerData, ok := data["data"].(map[string]interface{}) + if !ok { + continue + } + + owner, _ := innerData["owner"].(string) + if owner != "" { + storeOwners[NormalizeAddress(storeAddr)] = NormalizeAddress(owner) + } + } + + return storeOwners +} + +// buildStoreMetadataMap builds a map of store addresses to metadata addresses +func (tx *Transaction) buildStoreMetadataMap() map[string]string { + storeMetadata := make(map[string]string) + + for _, change := range tx.Changes { + var changeMap map[string]interface{} + if err := json.Unmarshal(change, &changeMap); err != nil { + continue + } + + changeType, _ := changeMap["type"].(string) + if changeType != "write_resource" { + continue + } + + data, ok := changeMap["data"].(map[string]interface{}) + if !ok { + continue + } + + dataType, _ := data["type"].(string) + if dataType != "0x1::fungible_asset::FungibleStore" { + continue + } + + storeAddr, _ := changeMap["address"].(string) + if storeAddr == "" { + continue + } + + innerData, ok := data["data"].(map[string]interface{}) + if !ok { + continue + } + + metadata, ok := innerData["metadata"].(map[string]interface{}) + if !ok { + continue + } + + metadataInner, _ := metadata["inner"].(string) + if metadataInner != "" { + storeMetadata[NormalizeAddress(storeAddr)] = NormalizeAddress(metadataInner) + } + } + + return storeMetadata +} + +func addToMap(m map[string]map[string]string, addr, assetType, amount string) { + if m[addr] == nil { + m[addr] = make(map[string]string) + } + if existing, ok := m[addr][assetType]; ok { + existingDec, _ := decimal.NewFromString(existing) + newDec, _ := decimal.NewFromString(amount) + m[addr][assetType] = existingDec.Add(newDec).String() + } else { + m[addr][assetType] = amount + } +} + +// extractFATypeFromMetadata extracts the FA type from the store metadata address +func extractFATypeFromMetadata(metadata string) string { + if metadata == "" { + return NativeAptosFA + } + normalized := NormalizeAddress(metadata) + if normalized == NativeAptosFA || normalized == "0x000000000000000000000000000000000000000000000000000000000000000a" { + return NativeAPTType // Return legacy type for native APT + } + return normalized +} + +// extractFAAmount extracts amount from FA event data +// FA events may have different data structures than Coin events +func extractFAAmount(data EventData) string { + if data.Amount != "" { + return data.Amount + } + return "0" +} + +func isNativeAptosFA(assetType string) bool { + // Native APT as FA has metadata at 0xa + return assetType == NativeAptosFA || + assetType == "0x0a" || + strings.HasSuffix(assetType, "::aptos_coin::AptosCoin") +} + +func (tx *Transaction) calculateFee() decimal.Decimal { + gasUsed, _ := strconv.ParseInt(tx.GasUsed, 10, 64) + gasPrice, _ := strconv.ParseInt(tx.GasUnitPrice, 10, 64) + + feeOctas := gasUsed * gasPrice + feeAPT := decimal.NewFromInt(feeOctas).Div(decimal.NewFromInt(100000000)) + + return feeAPT +} + +func parseTimestamp(ts string) uint64 { + microseconds, err := strconv.ParseUint(ts, 10, 64) + if err != nil { + return 0 + } + return microseconds / 1000000 +} + +func extractCoinType(eventType string) string { + start := strings.Index(eventType, "<") + end := strings.LastIndex(eventType, ">") + if start == -1 || end == -1 || start >= end { + return NativeAPTType + } + return strings.TrimSpace(eventType[start+1 : end]) +} + +func isNativeAPT(coinType string) bool { + // Native APT can be represented as: + // 1. Legacy: "0x1::aptos_coin::AptosCoin" + // 2. FA: "0x0000...000a" (the metadata address) + return coinType == NativeAPTType || + coinType == NativeAptosFA || + coinType == "0x000000000000000000000000000000000000000000000000000000000000000a" || + coinType == "" +} + +func findWithdrawAddress(withdrawMap map[string]map[string]string, coinType, amount string) string { + for addr, coins := range withdrawMap { + if withdrawAmount, ok := coins[coinType]; ok && withdrawAmount == amount { + return addr + } + } + return "" +} + +func ParseVersion(versionStr string) (uint64, error) { + version, err := strconv.ParseUint(versionStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid version: %w", err) + } + return version, nil +} diff --git a/internal/rpc/aptos/types.go b/internal/rpc/aptos/types.go new file mode 100644 index 0000000..97db9e5 --- /dev/null +++ b/internal/rpc/aptos/types.go @@ -0,0 +1,93 @@ +package aptos + +import "encoding/json" + +type LedgerInfo struct { + ChainID int `json:"chain_id"` + Epoch string `json:"epoch"` + LedgerVersion string `json:"ledger_version"` + OldestLedgerVersion string `json:"oldest_ledger_version"` + LedgerTimestamp string `json:"ledger_timestamp"` + NodeRole string `json:"node_role"` + OldestBlockHeight string `json:"oldest_block_height"` + BlockHeight string `json:"block_height"` + GitHash string `json:"git_hash"` +} + +type Block struct { + BlockHeight string `json:"block_height"` + BlockHash string `json:"block_hash"` + BlockTimestamp string `json:"block_timestamp"` + FirstVersion string `json:"first_version"` + LastVersion string `json:"last_version"` + Transactions []Transaction `json:"transactions,omitempty"` +} + +type Transaction struct { + Version string `json:"version"` + Hash string `json:"hash"` + StateChangeHash string `json:"state_change_hash"` + EventRootHash string `json:"event_root_hash"` + StateCheckpointHash string `json:"state_checkpoint_hash,omitempty"` + GasUsed string `json:"gas_used"` + Success bool `json:"success"` + VMStatus string `json:"vm_status"` + AccumulatorRootHash string `json:"accumulator_root_hash"` + Changes []json.RawMessage `json:"changes"` + Sender string `json:"sender,omitempty"` + SequenceNumber string `json:"sequence_number,omitempty"` + MaxGasAmount string `json:"max_gas_amount,omitempty"` + GasUnitPrice string `json:"gas_unit_price,omitempty"` + ExpirationTimestampSecs string `json:"expiration_timestamp_secs,omitempty"` + Payload *TransactionPayload `json:"payload,omitempty"` + Events []Event `json:"events"` + Timestamp string `json:"timestamp"` + Type string `json:"type"` +} + +type TransactionPayload struct { + Type string `json:"type"` + Function string `json:"function,omitempty"` + TypeArguments []string `json:"type_arguments,omitempty"` + Arguments []interface{} `json:"arguments,omitempty"` + Code json.RawMessage `json:"code,omitempty"` +} + +type Event struct { + GUID EventGUID `json:"guid"` + SequenceNumber string `json:"sequence_number"` + Type string `json:"type"` + Data EventData `json:"data"` +} + +type EventGUID struct { + CreationNumber string `json:"creation_number"` + AccountAddress string `json:"account_address"` +} + +type EventData struct { + Amount string `json:"amount,omitempty"` + Store string `json:"store,omitempty"` +} + +type ObjectCore struct { + AllowUngatedTransfer bool `json:"allow_ungated_transfer"` + GUIDCreationNum string `json:"guid_creation_num"` + Owner string `json:"owner"` +} + +const ( + EventTypeWithdraw = "0x1::coin::WithdrawEvent" + EventTypeDeposit = "0x1::coin::DepositEvent" + + EventTypeFAWithdraw = "0x1::fungible_asset::Withdraw" + EventTypeFADeposit = "0x1::fungible_asset::Deposit" + + PayloadTypeEntryFunction = "entry_function_payload" + PayloadTypeScript = "script_payload" + + TxTypeUser = "user_transaction" + TxTypeBlockMetadata = "block_metadata_transaction" + TxTypeStateCheckpoint = "state_checkpoint_transaction" + TxTypeGenesisTransaction = "genesis_transaction" +) diff --git a/internal/rpc/types.go b/internal/rpc/types.go index 729f5e6..01e10d7 100644 --- a/internal/rpc/types.go +++ b/internal/rpc/types.go @@ -20,6 +20,7 @@ const ( NetworkSolana = "solana" // Solana blockchain NetworkTron = "tron" // Tron blockchain NetworkBitcoin = "bitcoin" // Bitcoin blockchain + NetworkAptos = "aptos" // Aptos blockchain NetworkGeneric = "generic" // Generic/unknown blockchain type ) diff --git a/internal/worker/factory.go b/internal/worker/factory.go index dcf3608..f3987b6 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -6,6 +6,7 @@ import ( "github.com/fystack/multichain-indexer/internal/indexer" "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/aptos" "github.com/fystack/multichain-indexer/internal/rpc/bitcoin" "github.com/fystack/multichain-indexer/internal/rpc/evm" "github.com/fystack/multichain-indexer/internal/rpc/solana" @@ -263,6 +264,39 @@ func buildSolanaIndexer(chainName string, chainCfg config.ChainConfig, mode Work return indexer.NewSolanaIndexer(chainName, chainCfg, failover, pubkeyStore) } +// buildAptosIndexer constructs an Aptos indexer with failover and providers. +func buildAptosIndexer(chainName string, chainCfg config.ChainConfig, mode WorkerMode) indexer.Indexer { + failover := rpc.NewFailover[aptos.AptosAPI](nil) + + rl := ratelimiter.GetOrCreateSharedPooledRateLimiter( + chainName, chainCfg.Throttle.RPS, chainCfg.Throttle.Burst, + ) + + for i, node := range chainCfg.Nodes { + client := aptos.NewAptosClient( + node.URL, + &rpc.AuthConfig{ + Type: rpc.AuthType(node.Auth.Type), + Key: node.Auth.Key, + Value: node.Auth.Value, + }, + chainCfg.Client.Timeout, + rl, + ) + + failover.AddProvider(&rpc.Provider{ + Name: chainName + "-" + strconv.Itoa(i+1), + URL: node.URL, + Network: chainName, + ClientType: "rest", + Client: client, + State: rpc.StateHealthy, + }) + } + + return indexer.NewAptosIndexer(chainName, chainCfg, failover) +} + // buildSuiIndexer constructs a Sui indexer with failover and providers. func buildSuiIndexer( chainName string, @@ -325,6 +359,8 @@ func CreateManagerWithWorkers( idxr = buildBitcoinIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) case enum.NetworkTypeSol: idxr = buildSolanaIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) + case enum.NetworkTypeApt: + idxr = buildAptosIndexer(chainName, chainCfg, ModeRegular) case enum.NetworkTypeSui: idxr = buildSuiIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) default: diff --git a/pkg/addressbloomfilter/inmemory.go b/pkg/addressbloomfilter/inmemory.go index 507556f..414f986 100644 --- a/pkg/addressbloomfilter/inmemory.go +++ b/pkg/addressbloomfilter/inmemory.go @@ -47,6 +47,7 @@ func (abf *addressBloomFilter) Initialize(ctx context.Context) error { enum.NetworkTypeTron, enum.NetworkTypeBtc, enum.NetworkTypeSui, + enum.NetworkTypeApt, } for _, addrType := range types { diff --git a/pkg/addressbloomfilter/redis.go b/pkg/addressbloomfilter/redis.go index 32e4f3c..0bd24e3 100644 --- a/pkg/addressbloomfilter/redis.go +++ b/pkg/addressbloomfilter/redis.go @@ -73,6 +73,7 @@ func (rbf *redisBloomFilter) Initialize(ctx context.Context) error { enum.NetworkTypeBtc, enum.NetworkTypeSol, enum.NetworkTypeSui, + enum.NetworkTypeApt, } for _, addrType := range types { diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index c5bf40a..b667406 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -37,7 +37,7 @@ type ChainConfig struct { Name string `yaml:"-"` NetworkId string `yaml:"network_id"` InternalCode string `yaml:"internal_code"` - Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm btc sol sui"` + Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm btc sol sui apt"` FromLatest bool `yaml:"from_latest"` StartBlock int `yaml:"start_block" validate:"min=0"` PollInterval time.Duration `yaml:"poll_interval"` diff --git a/sql/wallet_address.sql b/sql/wallet_address.sql index 93bb13e..376fa07 100644 --- a/sql/wallet_address.sql +++ b/sql/wallet_address.sql @@ -23,7 +23,7 @@ DO $$ BEGIN 'evm', 'btc', 'sol', - 'aptos', + 'apt', 'tron' ); EXCEPTION