From 1d3fa5ee06387a06c14447f43c2c499d2e0e52a8 Mon Sep 17 00:00:00 2001 From: anhbaysgalan1 Date: Tue, 10 Feb 2026 13:58:47 +0800 Subject: [PATCH] feat: Introduce bloom filter sync worker, optimize Bitcoin RPC prevout resolution, and persist block hashes for reorg detection. --- cmd/indexer/main.go | 28 ++++- go.mod | 4 +- internal/indexer/bitcoin.go | 89 ++++++++------ internal/rpc/bitcoin/api.go | 3 + internal/rpc/bitcoin/client.go | 114 ++++++++++++++--- internal/rpc/client.go | 10 +- internal/worker/bloom_sync.go | 217 +++++++++++++++++++++++++++++++++ internal/worker/factory.go | 11 ++ internal/worker/manager.go | 32 ++++- internal/worker/mempool.go | 21 ++-- internal/worker/regular.go | 44 +++++-- pkg/common/config/services.go | 17 ++- pkg/store/blockstore/store.go | 43 +++++++ sql/wallet_address.sql | 3 + 14 files changed, 545 insertions(+), 91 deletions(-) create mode 100644 internal/worker/bloom_sync.go diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 08e889e..8b896df 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -182,11 +182,27 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from logger.Info("Starting from latest block for all specified chains", "chains", chains) } + // Build bloom sync config if enabled + var bloomSyncCfg *worker.BloomSyncConfig + if services.Bloomfilter != nil && services.Bloomfilter.Sync.Enabled && db != nil { + cfg := worker.DefaultBloomSyncConfig() + if services.Bloomfilter.Sync.Interval != "" { + if interval, parseErr := time.ParseDuration(services.Bloomfilter.Sync.Interval); parseErr == nil { + cfg.Interval = interval + } + } + if services.Bloomfilter.Sync.BatchSize > 0 { + cfg.BatchSize = services.Bloomfilter.Sync.BatchSize + } + bloomSyncCfg = &cfg + } + // Create manager with all workers using factory managerCfg := worker.ManagerConfig{ Chains: chains, EnableCatchup: catchup, EnableManual: manual, + BloomSync: bloomSyncCfg, } manager := worker.CreateManagerWithWorkers( @@ -211,19 +227,21 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from logger.Info("Shutting down indexer...") - // Shutdown health server + // Stop workers first so health endpoint can report during drain + manager.Stop() + + // Then shutdown health server if healthServer != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := healthServer.Shutdown(ctx); err != nil { + if err := healthServer.Shutdown(shutdownCtx); err != nil { logger.Error("Health server shutdown failed", "error", err) } else { logger.Info("Health server stopped gracefully") } } - manager.Stop() - logger.Info("✅ Indexer stopped gracefully") + logger.Info("Indexer stopped gracefully") } type HealthResponse struct { diff --git a/go.mod b/go.mod index fef59e3..cdc6cb0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/fystack/multichain-indexer -go 1.24.5 +go 1.24.0 + +toolchain go1.24.4 require ( dario.cat/mergo v1.0.2 diff --git a/internal/indexer/bitcoin.go b/internal/indexer/bitcoin.go index dd3654f..7df8f60 100644 --- a/internal/indexer/bitcoin.go +++ b/internal/indexer/bitcoin.go @@ -89,9 +89,10 @@ func (b *BitcoinIndexer) GetBlock(ctx context.Context, number uint64) (*types.Bl return b.convertBlockWithPrevoutResolution(ctx, btcBlock) } -// convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions +// convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions. +// Uses batch prevout resolution to avoid N+1 RPC calls per transaction. func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, btcBlock *bitcoin.Block) (*types.Block, error) { - var allTransfers []types.Transaction + allTransfers := make([]types.Transaction, 0, len(btcBlock.Tx)*2) // Calculate latest block height from confirmations latestBlock := btcBlock.Height @@ -99,36 +100,34 @@ func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, latestBlock = btcBlock.Height + btcBlock.Confirmations - 1 } - // Get a client for prevout resolution - provider, _ := b.failover.GetBestProvider() - var btcClient *bitcoin.BitcoinClient - if provider != nil { - btcClient, _ = provider.Client.(*bitcoin.BitcoinClient) - } - + // Collect all non-coinbase transactions that need prevout resolution + var needResolution []*bitcoin.Transaction for i := range btcBlock.Tx { tx := &btcBlock.Tx[i] - - // Skip coinbase transactions if tx.IsCoinbase() { continue } + if len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil { + needResolution = append(needResolution, tx) + } + } - // Try to resolve prevout data for FROM address if not already present - if btcClient != nil && len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil { - if tx.Vin[0].TxID != "" { - if resolved, err := btcClient.GetTransactionWithPrevouts(ctx, tx.TxID); err == nil { - // Copy resolved prevout data - for j := range tx.Vin { - if j < len(resolved.Vin) && resolved.Vin[j].PrevOut != nil { - tx.Vin[j].PrevOut = resolved.Vin[j].PrevOut - } - } - } - } + // Batch-resolve all prevouts in one pass using parallel fetching + if len(needResolution) > 0 { + if err := b.failover.ExecuteWithRetry(ctx, func(c bitcoin.BitcoinAPI) error { + return c.ResolvePrevouts(ctx, needResolution, b.config.Throttle.Concurrency) + }); err != nil { + // Log but don't fail — partial prevout data is acceptable + _ = err } + } - // Extract all transfers - filtering happens in worker's emitBlock + // Extract transfers from all transactions + for i := range btcBlock.Tx { + tx := &btcBlock.Tx[i] + if tx.IsCoinbase() { + continue + } transfers := b.extractTransfersFromTx(tx, btcBlock.Height, btcBlock.Time, latestBlock) allTransfers = append(allTransfers, transfers...) } @@ -226,13 +225,13 @@ func (b *BitcoinIndexer) extractTransfersFromTx( tx *bitcoin.Transaction, blockNumber, ts, latestBlock uint64, ) []types.Transaction { - var transfers []types.Transaction - // Skip coinbase transactions if tx.IsCoinbase() { - return transfers + return nil } + transfers := make([]types.Transaction, 0, len(tx.Vout)) + fee := tx.CalculateFee() confirmations := b.calculateConfirmations(blockNumber, latestBlock) @@ -365,21 +364,35 @@ func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Tr return nil, fmt.Errorf("unexpected mempool format") } - // Process each transaction - var allTransfers []types.Transaction + // Fetch transactions in batches and resolve prevouts currentTime := uint64(time.Now().Unix()) + allTransfers := make([]types.Transaction, 0, len(txids)) + + const batchSize = 50 + for i := 0; i < len(txids); i += batchSize { + end := min(i+batchSize, len(txids)) + batch := txids[i:end] + + // Fetch all transactions in this batch + var txs []*bitcoin.Transaction + for _, txid := range batch { + tx, err := btcClient.GetRawTransaction(ctx, txid, true) + if err != nil { + continue + } + txs = append(txs, tx) + } - for _, txid := range txids { - // Fetch transaction with prevout data resolved - // This is critical for detecting FROM addresses - tx, err := btcClient.GetTransactionWithPrevouts(ctx, txid) - if err != nil { - continue + // Batch-resolve prevouts for all fetched transactions + if len(txs) > 0 { + _ = btcClient.ResolvePrevouts(ctx, txs, 8) } - // Extract transfers (blockNumber=0 for mempool, confirmations will be 0) - transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock) - allTransfers = append(allTransfers, transfers...) + // Extract transfers + for _, tx := range txs { + transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock) + allTransfers = append(allTransfers, transfers...) + } } return allTransfers, nil diff --git a/internal/rpc/bitcoin/api.go b/internal/rpc/bitcoin/api.go index a668809..f961022 100644 --- a/internal/rpc/bitcoin/api.go +++ b/internal/rpc/bitcoin/api.go @@ -24,4 +24,7 @@ type BitcoinAPI interface { GetRawTransaction(ctx context.Context, txid string, verbose bool) (*Transaction, error) GetTransactionWithPrevouts(ctx context.Context, txid string) (*Transaction, error) GetMempoolEntry(ctx context.Context, txid string) (*MempoolEntry, error) + + // Batch operations + ResolvePrevouts(ctx context.Context, txs []*Transaction, concurrency int) error } diff --git a/internal/rpc/bitcoin/client.go b/internal/rpc/bitcoin/client.go index a8142b2..3f510b3 100644 --- a/internal/rpc/bitcoin/client.go +++ b/internal/rpc/bitcoin/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/fystack/multichain-indexer/internal/rpc" @@ -176,30 +177,113 @@ func (c *BitcoinClient) GetTransactionWithPrevouts(ctx context.Context, txid str return nil, err } - // If prevout data already present, return as-is - if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil { - return tx, nil + if err := c.ResolvePrevouts(ctx, []*Transaction{tx}, 4); err != nil { + return nil, err } + return tx, nil +} - // Resolve prevout for each input (skip coinbase) - for i := range tx.Vin { - if tx.Vin[i].TxID == "" { - continue // Coinbase input - } +// ResolvePrevouts resolves prevout data for all inputs across multiple transactions +// using parallel fetching with deduplication. This eliminates the N+1 problem where +// each input would otherwise require a separate RPC call. +func (c *BitcoinClient) ResolvePrevouts(ctx context.Context, txs []*Transaction, concurrency int) error { + if concurrency <= 0 { + concurrency = 8 + } - prevTx, err := c.GetRawTransaction(ctx, tx.Vin[i].TxID, true) - if err != nil { - // Log but don't fail - some prevouts may be unavailable + // Collect all unique prevout txids we need to fetch + type prevoutRef struct { + txid string + vout uint32 + } + needed := make(map[string]struct{}) + for _, tx := range txs { + if tx.IsCoinbase() { continue } + // Skip if prevouts are already resolved + if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil { + continue + } + for _, vin := range tx.Vin { + if vin.TxID != "" { + needed[vin.TxID] = struct{}{} + } + } + } + + if len(needed) == 0 { + return nil + } + + // Fetch all needed transactions in parallel with bounded concurrency + var mu sync.Mutex + prevoutCache := make(map[string]*Transaction, len(needed)) + + txids := make([]string, 0, len(needed)) + for txid := range needed { + txids = append(txids, txid) + } + + jobs := make(chan string, concurrency*2) + var wg sync.WaitGroup + + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for txid := range jobs { + prevTx, err := c.GetRawTransaction(ctx, txid, true) + if err != nil { + continue // Skip unavailable prevouts + } + mu.Lock() + prevoutCache[txid] = prevTx + mu.Unlock() + } + }() + } + + go func() { + defer close(jobs) + for _, txid := range txids { + select { + case <-ctx.Done(): + return + case jobs <- txid: + } + } + }() - voutIdx := tx.Vin[i].Vout - if int(voutIdx) < len(prevTx.Vout) { - tx.Vin[i].PrevOut = &prevTx.Vout[voutIdx] + wg.Wait() + if ctx.Err() != nil { + return ctx.Err() + } + + // Assign resolved prevouts back to inputs + for _, tx := range txs { + if tx.IsCoinbase() { + continue + } + if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil { + continue + } + for i := range tx.Vin { + if tx.Vin[i].TxID == "" { + continue + } + prevTx, ok := prevoutCache[tx.Vin[i].TxID] + if !ok { + continue + } + voutIdx := tx.Vin[i].Vout + if int(voutIdx) < len(prevTx.Vout) { + tx.Vin[i].PrevOut = &prevTx.Vout[voutIdx] + } } } - return tx, nil + return nil } // GetMempoolEntry returns mempool entry for a specific transaction diff --git a/internal/rpc/client.go b/internal/rpc/client.go index 30eb28a..394dd80 100644 --- a/internal/rpc/client.go +++ b/internal/rpc/client.go @@ -46,8 +46,16 @@ func NewBaseClient( timeout time.Duration, rl *ratelimiter.PooledRateLimiter, ) *BaseClient { + transport := &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 20, + IdleConnTimeout: 90 * time.Second, + } return &BaseClient{ - httpClient: &http.Client{Timeout: timeout}, + httpClient: &http.Client{ + Timeout: timeout, + Transport: transport, + }, baseURL: strings.TrimSuffix(baseURL, "/"), auth: auth, network: network, diff --git a/internal/worker/bloom_sync.go b/internal/worker/bloom_sync.go new file mode 100644 index 0000000..5473eb2 --- /dev/null +++ b/internal/worker/bloom_sync.go @@ -0,0 +1,217 @@ +package worker + +import ( + "context" + "sync" + "time" + + "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/logger" + "github.com/fystack/multichain-indexer/pkg/model" + "github.com/samber/lo" + "gorm.io/gorm" +) + +// BloomSyncConfig holds configuration for the bloom filter sync worker. +type BloomSyncConfig struct { + Interval time.Duration // How often to check for new addresses + BatchSize int // Max addresses to fetch per sync cycle +} + +// DefaultBloomSyncConfig returns sensible defaults for production. +func DefaultBloomSyncConfig() BloomSyncConfig { + return BloomSyncConfig{ + Interval: time.Second, + BatchSize: 500, + } +} + +// BloomSyncWorker continuously syncs new addresses from the database to the bloom filter. +// Implements the Worker interface for unified lifecycle management. +type BloomSyncWorker struct { + ctx context.Context + cancel context.CancelFunc + + bloomFilter addressbloomfilter.WalletAddressBloomFilter + db *gorm.DB + config BloomSyncConfig + + mu sync.RWMutex + lastSyncedTimes map[enum.NetworkType]time.Time + totalSynced map[enum.NetworkType]uint64 +} + +// NewBloomSyncWorker creates a new bloom filter sync worker. +func NewBloomSyncWorker( + ctx context.Context, + bloomFilter addressbloomfilter.WalletAddressBloomFilter, + db *gorm.DB, + config BloomSyncConfig, +) *BloomSyncWorker { + workerCtx, cancel := context.WithCancel(ctx) + return &BloomSyncWorker{ + ctx: workerCtx, + cancel: cancel, + bloomFilter: bloomFilter, + db: db, + config: config, + lastSyncedTimes: make(map[enum.NetworkType]time.Time), + totalSynced: make(map[enum.NetworkType]uint64), + } +} + +// Start begins the background sync loop. +func (w *BloomSyncWorker) Start() { + go w.run() +} + +// Stop gracefully stops the sync worker. +func (w *BloomSyncWorker) Stop() { + w.cancel() + logger.Info("Bloom filter sync worker stopped") +} + +func (w *BloomSyncWorker) run() { + ticker := time.NewTicker(w.config.Interval) + defer ticker.Stop() + + w.initLastSyncedTimes() + + logger.Info("Bloom filter sync worker started", + "interval", w.config.Interval, + "batchSize", w.config.BatchSize, + ) + + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + w.syncAllNetworks() + } + } +} + +func (w *BloomSyncWorker) initLastSyncedTimes() { + w.mu.Lock() + defer w.mu.Unlock() + + now := time.Now() + for _, nt := range []enum.NetworkType{ + enum.NetworkTypeEVM, + enum.NetworkTypeTron, + enum.NetworkTypeBtc, + enum.NetworkTypeSui, + } { + w.lastSyncedTimes[nt] = now + w.totalSynced[nt] = 0 + } +} + +func (w *BloomSyncWorker) syncAllNetworks() { + for _, nt := range []enum.NetworkType{ + enum.NetworkTypeEVM, + enum.NetworkTypeTron, + enum.NetworkTypeBtc, + enum.NetworkTypeSui, + } { + if err := w.syncNetwork(nt); err != nil { + logger.Error("Bloom sync failed for network", + "networkType", nt, + "error", err, + ) + } + } +} + +// syncNetwork fetches and adds new addresses for a specific network type. +// Loops until fully caught up to handle burst inserts efficiently. +func (w *BloomSyncWorker) syncNetwork(networkType enum.NetworkType) error { + totalSynced := 0 + + for { + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } + + w.mu.RLock() + lastSynced := w.lastSyncedTimes[networkType] + w.mu.RUnlock() + + // Use a small overlap to handle clock skew edge cases + queryTime := lastSynced.Add(-time.Second) + + var addresses []model.WalletAddress + err := w.db.WithContext(w.ctx). + Model(&model.WalletAddress{}). + Select("address", "created_at"). + Where("type = ? AND created_at > ?", networkType, queryTime). + Order("created_at ASC"). + Limit(w.config.BatchSize). + Find(&addresses).Error + + if err != nil { + return err + } + + if len(addresses) == 0 { + if totalSynced > 0 { + logger.Info("Bloom sync caught up", + "networkType", networkType, + "totalSynced", totalSynced, + ) + } + return nil + } + + // Add to bloom filter (idempotent) + addressStrings := lo.Map(addresses, func(a model.WalletAddress, _ int) string { + return a.Address + }) + w.bloomFilter.AddBatch(addressStrings, networkType) + + latestTime := addresses[len(addresses)-1].CreatedAt + + w.mu.Lock() + w.lastSyncedTimes[networkType] = latestTime + w.totalSynced[networkType] += uint64(len(addresses)) + w.mu.Unlock() + + totalSynced += len(addresses) + + // If we got less than a full batch, we're caught up + if len(addresses) < w.config.BatchSize { + if totalSynced > 0 { + logger.Debug("Bloom sync completed", + "networkType", networkType, + "synced", totalSynced, + ) + } + return nil + } + + logger.Debug("Bloom sync progress", + "networkType", networkType, + "batch", len(addresses), + "totalSynced", totalSynced, + ) + } +} + +// Stats returns sync worker statistics. +func (w *BloomSyncWorker) Stats() map[string]any { + w.mu.RLock() + defer w.mu.RUnlock() + + stats := make(map[string]any) + for nt, count := range w.totalSynced { + stats[string(nt)] = map[string]any{ + "totalSynced": count, + "lastSyncedAt": w.lastSyncedTimes[nt], + } + } + return stats +} diff --git a/internal/worker/factory.go b/internal/worker/factory.go index dcf3608..7ab328f 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -41,6 +41,7 @@ type ManagerConfig struct { EnableRescanner bool EnableCatchup bool EnableManual bool + BloomSync *BloomSyncConfig // nil = disabled } // BuildWorkers constructs workers for a given mode. @@ -367,5 +368,15 @@ func CreateManagerWithWorkers( } } + // Bloom filter sync worker (global, not per-chain) + if managerCfg.BloomSync != nil && db != nil && addressBF != nil { + bloomWorker := NewBloomSyncWorker(ctx, addressBF, db, *managerCfg.BloomSync) + manager.AddWorkers(bloomWorker) + logger.Info("Bloom filter sync worker enabled", + "interval", managerCfg.BloomSync.Interval, + "batchSize", managerCfg.BloomSync.BatchSize, + ) + } + return manager } diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 5068485..a8aa7fc 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -2,6 +2,8 @@ package worker import ( "context" + "sync" + "time" "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" @@ -11,6 +13,8 @@ import ( "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) +const defaultShutdownTimeout = 30 * time.Second + type Manager struct { ctx context.Context workers []Worker @@ -46,13 +50,31 @@ func (m *Manager) Start() { } } -// Stop shuts down all workers + resources +// Stop shuts down all workers concurrently with a timeout, then closes resources. func (m *Manager) Stop() { - // Stop all workers - for _, w := range m.workers { - if w != nil { - w.Stop() + // Stop all workers concurrently with timeout + done := make(chan struct{}) + go func() { + var wg sync.WaitGroup + for _, w := range m.workers { + if w != nil { + wg.Add(1) + go func(w Worker) { + defer wg.Done() + w.Stop() + }(w) + } } + wg.Wait() + close(done) + }() + + select { + case <-done: + logger.Info("All workers stopped") + case <-time.After(defaultShutdownTimeout): + logger.Warn("Worker shutdown timed out, proceeding with resource cleanup", + "timeout", defaultShutdownTimeout) } // Close resources diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 587abe0..942c237 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -3,6 +3,7 @@ package worker import ( "context" "fmt" + "sync" "time" "github.com/fystack/multichain-indexer/internal/indexer" @@ -17,6 +18,7 @@ import ( // Polls the mempool for pending transactions and emits them to NATS type MempoolWorker struct { *BaseWorker + mu sync.Mutex seenTxs map[string]bool // Track seen transactions to avoid duplicates pollInterval time.Duration // How often to poll mempool btcIndexer *indexer.BitcoinIndexer @@ -95,6 +97,7 @@ func (mw *MempoolWorker) processMempool() error { newTxCount := 0 networkType := mw.chain.GetNetworkType() + mw.mu.Lock() for _, tx := range transactions { // Only emit transactions where TO address is monitored (incoming deposits) // Outgoing transactions are handled by the withdrawal flow @@ -132,16 +135,10 @@ func (mw *MempoolWorker) processMempool() error { } } - if newTxCount > 0 { - mw.logger.Info("Processed mempool transactions", - "new_txs", newTxCount, - "total_tracked", len(mw.seenTxs), - ) - } + trackedCount := len(mw.seenTxs) // Cleanup: Remove old transactions from tracking (keep last 10k) - if len(mw.seenTxs) > 10000 { - // Clear half the map (simple approach) + if trackedCount > 10000 { count := 0 for txKey := range mw.seenTxs { delete(mw.seenTxs, txKey) @@ -152,6 +149,14 @@ func (mw *MempoolWorker) processMempool() error { } mw.logger.Debug("Cleaned up old mempool transactions", "removed", count) } + mw.mu.Unlock() + + if newTxCount > 0 { + mw.logger.Info("Processed mempool transactions", + "new_txs", newTxCount, + "total_tracked", trackedCount, + ) + } // Sleep until next poll interval select { diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 760a446..c0adca2 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -16,18 +16,13 @@ import ( ) const ( - MaxBlockHashSize = 20 + MaxBlockHashSize = 50 ) type RegularWorker struct { *BaseWorker currentBlock uint64 - blockHashes []BlockHashEntry -} - -type BlockHashEntry struct { - BlockNumber uint64 - Hash string + blockHashes []blockstore.BlockHashEntry } func NewRegularWorker( @@ -53,7 +48,7 @@ func NewRegularWorker( ) rw := &RegularWorker{BaseWorker: worker} rw.currentBlock = rw.determineStartingBlock() - rw.blockHashes = make([]BlockHashEntry, 0, MaxBlockHashSize) + rw.loadBlockHashes() return rw } @@ -71,7 +66,8 @@ func (rw *RegularWorker) Stop() { if rw.currentBlock > 0 { _ = rw.blockStore.SaveLatestBlock(rw.chain.GetNetworkInternalCode(), rw.currentBlock) } - rw.clearBlockHashes() + // Persist block hashes for reorg detection on next startup + rw.persistBlockHashes() // Call base worker stop to cancel context and clean up rw.BaseWorker.Stop() } @@ -286,15 +282,13 @@ func (rw *RegularWorker) isReorgCheckRequired() bool { // addBlockHash adds a block hash to the array, maintaining max size func (rw *RegularWorker) addBlockHash(blockNumber uint64, hash string) { - entry := BlockHashEntry{ + entry := blockstore.BlockHashEntry{ BlockNumber: blockNumber, Hash: hash, } - // Add to the end rw.blockHashes = append(rw.blockHashes, entry) - // Remove oldest entries if we exceed max size if len(rw.blockHashes) > MaxBlockHashSize { rw.blockHashes = rw.blockHashes[len(rw.blockHashes)-MaxBlockHashSize:] } @@ -312,7 +306,31 @@ func (rw *RegularWorker) getBlockHash(blockNumber uint64) string { // clearBlockHashes clears all block hashes (used on reorg) func (rw *RegularWorker) clearBlockHashes() { - rw.blockHashes = rw.blockHashes[:0] // Clear slice but keep capacity + rw.blockHashes = rw.blockHashes[:0] +} + +// loadBlockHashes loads persisted block hashes from KV store on startup +func (rw *RegularWorker) loadBlockHashes() { + hashes, err := rw.blockStore.GetBlockHashes(rw.chain.GetNetworkInternalCode()) + if err != nil || len(hashes) == 0 { + rw.blockHashes = make([]blockstore.BlockHashEntry, 0, MaxBlockHashSize) + return + } + rw.blockHashes = hashes + rw.logger.Info("Loaded persisted block hashes for reorg detection", + "chain", rw.chain.GetName(), + "count", len(hashes), + ) +} + +// persistBlockHashes saves block hashes to KV store for reorg detection across restarts +func (rw *RegularWorker) persistBlockHashes() { + if len(rw.blockHashes) == 0 { + return + } + if err := rw.blockStore.SaveBlockHashes(rw.chain.GetNetworkInternalCode(), rw.blockHashes); err != nil { + rw.logger.Error("Failed to persist block hashes", "err", err) + } } func checkContinuity(prev, curr indexer.BlockResult) bool { diff --git a/pkg/common/config/services.go b/pkg/common/config/services.go index 891fc7e..143bb43 100644 --- a/pkg/common/config/services.go +++ b/pkg/common/config/services.go @@ -74,11 +74,18 @@ type BadgerConfig struct { } type BloomfilterConfig struct { - Type enum.BFType `yaml:"type"` - WalletAddressRepo string `yaml:"wallet_address_repo"` - BatchSize int `yaml:"batch_size"` - Redis RedisBFConfig `yaml:"redis"` - InMemory InMemoryConfig `yaml:"in_memory"` + Type enum.BFType `yaml:"type"` + WalletAddressRepo string `yaml:"wallet_address_repo"` + BatchSize int `yaml:"batch_size"` + Redis RedisBFConfig `yaml:"redis"` + InMemory InMemoryConfig `yaml:"in_memory"` + Sync BloomSyncConfig `yaml:"sync"` +} + +type BloomSyncConfig struct { + Enabled bool `yaml:"enabled"` + Interval string `yaml:"interval"` // e.g. "1s", "5s" + BatchSize int `yaml:"batch_size"` // max addresses per sync cycle } type RedisBFConfig struct { diff --git a/pkg/store/blockstore/store.go b/pkg/store/blockstore/store.go index 18755f5..8b90cb0 100644 --- a/pkg/store/blockstore/store.go +++ b/pkg/store/blockstore/store.go @@ -1,6 +1,7 @@ package blockstore import ( + "encoding/json" "errors" "fmt" "slices" @@ -12,6 +13,12 @@ import ( "github.com/fystack/multichain-indexer/pkg/infra" ) +// BlockHashEntry represents a block number and its hash for reorg detection. +type BlockHashEntry struct { + BlockNumber uint64 `json:"block_number"` + Hash string `json:"hash"` +} + const ( BlockStates = "block_states" ) @@ -35,6 +42,10 @@ func composeCatchupKey(chain string) string { return fmt.Sprintf("%s/%s/%s/", BlockStates, chain, constant.KVPrefixProgressCatchup) } +func blockHashesKey(chainName string) string { + return fmt.Sprintf("%s/%s/%s", BlockStates, chainName, constant.KVPrefixBlockHash) +} + func catchupKey(chain string, start, end uint64) string { return fmt.Sprintf("%s/%s/%s/%d-%d", BlockStates, chain, constant.KVPrefixProgressCatchup, start, end) } @@ -56,6 +67,10 @@ type Store interface { GetCatchupProgress(chain string) ([]CatchupRange, error) DeleteCatchupRange(chain string, start, end uint64) error + // Block hash persistence for reorg detection across restarts + SaveBlockHashes(chainName string, hashes []BlockHashEntry) error + GetBlockHashes(chainName string) ([]BlockHashEntry, error) + Close() error } @@ -243,6 +258,34 @@ func (bs *blockStore) DeleteCatchupRange(chain string, start, end uint64) error return err } +// SaveBlockHashes persists block hashes for reorg detection across restarts. +func (bs *blockStore) SaveBlockHashes(chainName string, hashes []BlockHashEntry) error { + if chainName == "" { + return errors.New("chain name is required") + } + data, err := json.Marshal(hashes) + if err != nil { + return fmt.Errorf("marshal block hashes: %w", err) + } + return bs.store.Set(blockHashesKey(chainName), string(data)) +} + +// GetBlockHashes loads persisted block hashes for reorg detection. +func (bs *blockStore) GetBlockHashes(chainName string) ([]BlockHashEntry, error) { + if chainName == "" { + return nil, errors.New("chain name is required") + } + raw, err := bs.store.Get(blockHashesKey(chainName)) + if err != nil { + return nil, nil // Key not found is not an error + } + var hashes []BlockHashEntry + if err := json.Unmarshal([]byte(raw), &hashes); err != nil { + return nil, fmt.Errorf("unmarshal block hashes: %w", err) + } + return hashes, nil +} + func (bs *blockStore) Close() error { return bs.store.Close() } diff --git a/sql/wallet_address.sql b/sql/wallet_address.sql index 93bb13e..16a55ef 100644 --- a/sql/wallet_address.sql +++ b/sql/wallet_address.sql @@ -17,6 +17,9 @@ CREATE INDEX IF NOT EXISTS idx_wallet_address_type ON wallet_addresses (type); CREATE INDEX IF NOT EXISTS idx_wallet_address_standard ON wallet_addresses (standard); CREATE INDEX IF NOT EXISTS idx_wallet_address_created_at ON wallet_addresses (created_at); +-- Composite index for bloom filter sync worker queries (WHERE type = ? AND created_at > ?) +CREATE INDEX IF NOT EXISTS idx_wallet_addresses_type_created ON wallet_addresses (type, created_at); + -- Create enum types if they don't exist DO $$ BEGIN CREATE TYPE address_type AS ENUM (