Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,27 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from
logger.Info("Starting from latest block for all specified chains", "chains", chains)
}

// Build bloom sync config if enabled
var bloomSyncCfg *worker.BloomSyncConfig
if services.Bloomfilter != nil && services.Bloomfilter.Sync.Enabled && db != nil {
cfg := worker.DefaultBloomSyncConfig()
if services.Bloomfilter.Sync.Interval != "" {
if interval, parseErr := time.ParseDuration(services.Bloomfilter.Sync.Interval); parseErr == nil {
cfg.Interval = interval
}
}
if services.Bloomfilter.Sync.BatchSize > 0 {
cfg.BatchSize = services.Bloomfilter.Sync.BatchSize
}
bloomSyncCfg = &cfg
}

// Create manager with all workers using factory
managerCfg := worker.ManagerConfig{
Chains: chains,
EnableCatchup: catchup,
EnableManual: manual,
BloomSync: bloomSyncCfg,
}

manager := worker.CreateManagerWithWorkers(
Expand All @@ -211,19 +227,21 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from

logger.Info("Shutting down indexer...")

// Shutdown health server
// Stop workers first so health endpoint can report during drain
manager.Stop()

// Then shutdown health server
if healthServer != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := healthServer.Shutdown(ctx); err != nil {
if err := healthServer.Shutdown(shutdownCtx); err != nil {
logger.Error("Health server shutdown failed", "error", err)
} else {
logger.Info("Health server stopped gracefully")
}
}

manager.Stop()
logger.Info("✅ Indexer stopped gracefully")
logger.Info("Indexer stopped gracefully")
}

type HealthResponse struct {
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/fystack/multichain-indexer

go 1.24.5
go 1.24.0

toolchain go1.24.4

require (
dario.cat/mergo v1.0.2
Expand Down
89 changes: 51 additions & 38 deletions internal/indexer/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,45 @@ func (b *BitcoinIndexer) GetBlock(ctx context.Context, number uint64) (*types.Bl
return b.convertBlockWithPrevoutResolution(ctx, btcBlock)
}

// convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions
// convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions.
// Uses batch prevout resolution to avoid N+1 RPC calls per transaction.
func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, btcBlock *bitcoin.Block) (*types.Block, error) {
var allTransfers []types.Transaction
allTransfers := make([]types.Transaction, 0, len(btcBlock.Tx)*2)

// Calculate latest block height from confirmations
latestBlock := btcBlock.Height
if btcBlock.Confirmations > 0 {
latestBlock = btcBlock.Height + btcBlock.Confirmations - 1
}

// Get a client for prevout resolution
provider, _ := b.failover.GetBestProvider()
var btcClient *bitcoin.BitcoinClient
if provider != nil {
btcClient, _ = provider.Client.(*bitcoin.BitcoinClient)
}

// Collect all non-coinbase transactions that need prevout resolution
var needResolution []*bitcoin.Transaction
for i := range btcBlock.Tx {
tx := &btcBlock.Tx[i]

// Skip coinbase transactions
if tx.IsCoinbase() {
continue
}
if len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil {
needResolution = append(needResolution, tx)
}
}

// Try to resolve prevout data for FROM address if not already present
if btcClient != nil && len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil {
if tx.Vin[0].TxID != "" {
if resolved, err := btcClient.GetTransactionWithPrevouts(ctx, tx.TxID); err == nil {
// Copy resolved prevout data
for j := range tx.Vin {
if j < len(resolved.Vin) && resolved.Vin[j].PrevOut != nil {
tx.Vin[j].PrevOut = resolved.Vin[j].PrevOut
}
}
}
}
// Batch-resolve all prevouts in one pass using parallel fetching
if len(needResolution) > 0 {
if err := b.failover.ExecuteWithRetry(ctx, func(c bitcoin.BitcoinAPI) error {
return c.ResolvePrevouts(ctx, needResolution, b.config.Throttle.Concurrency)
}); err != nil {
// Log but don't fail — partial prevout data is acceptable
_ = err
}
}

// Extract all transfers - filtering happens in worker's emitBlock
// Extract transfers from all transactions
for i := range btcBlock.Tx {
tx := &btcBlock.Tx[i]
if tx.IsCoinbase() {
continue
}
transfers := b.extractTransfersFromTx(tx, btcBlock.Height, btcBlock.Time, latestBlock)
allTransfers = append(allTransfers, transfers...)
}
Expand Down Expand Up @@ -226,13 +225,13 @@ func (b *BitcoinIndexer) extractTransfersFromTx(
tx *bitcoin.Transaction,
blockNumber, ts, latestBlock uint64,
) []types.Transaction {
var transfers []types.Transaction

// Skip coinbase transactions
if tx.IsCoinbase() {
return transfers
return nil
}

transfers := make([]types.Transaction, 0, len(tx.Vout))

fee := tx.CalculateFee()

confirmations := b.calculateConfirmations(blockNumber, latestBlock)
Expand Down Expand Up @@ -365,21 +364,35 @@ func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Tr
return nil, fmt.Errorf("unexpected mempool format")
}

// Process each transaction
var allTransfers []types.Transaction
// Fetch transactions in batches and resolve prevouts
currentTime := uint64(time.Now().Unix())
allTransfers := make([]types.Transaction, 0, len(txids))

const batchSize = 50
for i := 0; i < len(txids); i += batchSize {
end := min(i+batchSize, len(txids))
batch := txids[i:end]

// Fetch all transactions in this batch
var txs []*bitcoin.Transaction
for _, txid := range batch {
tx, err := btcClient.GetRawTransaction(ctx, txid, true)
if err != nil {
continue
}
txs = append(txs, tx)
}

for _, txid := range txids {
// Fetch transaction with prevout data resolved
// This is critical for detecting FROM addresses
tx, err := btcClient.GetTransactionWithPrevouts(ctx, txid)
if err != nil {
continue
// Batch-resolve prevouts for all fetched transactions
if len(txs) > 0 {
_ = btcClient.ResolvePrevouts(ctx, txs, 8)
}

// Extract transfers (blockNumber=0 for mempool, confirmations will be 0)
transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock)
allTransfers = append(allTransfers, transfers...)
// Extract transfers
for _, tx := range txs {
transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock)
allTransfers = append(allTransfers, transfers...)
}
}

return allTransfers, nil
Expand Down
3 changes: 3 additions & 0 deletions internal/rpc/bitcoin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ type BitcoinAPI interface {
GetRawTransaction(ctx context.Context, txid string, verbose bool) (*Transaction, error)
GetTransactionWithPrevouts(ctx context.Context, txid string) (*Transaction, error)
GetMempoolEntry(ctx context.Context, txid string) (*MempoolEntry, error)

// Batch operations
ResolvePrevouts(ctx context.Context, txs []*Transaction, concurrency int) error
}
114 changes: 99 additions & 15 deletions internal/rpc/bitcoin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/fystack/multichain-indexer/internal/rpc"
Expand Down Expand Up @@ -176,30 +177,113 @@ func (c *BitcoinClient) GetTransactionWithPrevouts(ctx context.Context, txid str
return nil, err
}

// If prevout data already present, return as-is
if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil {
return tx, nil
if err := c.ResolvePrevouts(ctx, []*Transaction{tx}, 4); err != nil {
return nil, err
}
return tx, nil
}

// Resolve prevout for each input (skip coinbase)
for i := range tx.Vin {
if tx.Vin[i].TxID == "" {
continue // Coinbase input
}
// ResolvePrevouts resolves prevout data for all inputs across multiple transactions
// using parallel fetching with deduplication. This eliminates the N+1 problem where
// each input would otherwise require a separate RPC call.
func (c *BitcoinClient) ResolvePrevouts(ctx context.Context, txs []*Transaction, concurrency int) error {
if concurrency <= 0 {
concurrency = 8
}
Comment on lines +190 to +192
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be configurable, or define a const instead of using a magic number

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll extract this to a const DefaultPrevoutConcurrency = 8 and also fix the hardcoded 8 in GetMempoolTransactions. The value itself comes from the caller, convertBlockWithPrevoutResolution already passes config.Throttle.Concurrency, so the default here is only a safety fallback.


prevTx, err := c.GetRawTransaction(ctx, tx.Vin[i].TxID, true)
if err != nil {
// Log but don't fail - some prevouts may be unavailable
// Collect all unique prevout txids we need to fetch
type prevoutRef struct {
txid string
vout uint32
}
needed := make(map[string]struct{})
for _, tx := range txs {
if tx.IsCoinbase() {
continue
}
// Skip if prevouts are already resolved
if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil {
continue
}
for _, vin := range tx.Vin {
if vin.TxID != "" {
needed[vin.TxID] = struct{}{}
}
}
}

if len(needed) == 0 {
return nil
}

// Fetch all needed transactions in parallel with bounded concurrency
Copy link
Collaborator

@Azzurriii Azzurriii Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only need to monitor tx where "to" addresses are monitored, why ResolvePrevouts needs fetches the entire previous transaction?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bitcoin UTXO model - inputs only have (txid, vout) reference, no address or amount. We need to fetch the full prev tx to get the from-address and calculate fees. Unfortunately getrawtransaction is the only RPC available for this. I think it should better track full UTXO record for future scaling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed with you on this point, and i have a pending PR for indexing UTXO: #42

FYI, This indexer only track tx where "to" is on monitored list, but for the UTXO, is it good if we tracking both direction? Because every tx can have multiple direction of UTXO? I'd appriciate if you can take a look in this and give me some opinion,

var mu sync.Mutex
prevoutCache := make(map[string]*Transaction, len(needed))

txids := make([]string, 0, len(needed))
for txid := range needed {
txids = append(txids, txid)
}

jobs := make(chan string, concurrency*2)
var wg sync.WaitGroup

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for txid := range jobs {
prevTx, err := c.GetRawTransaction(ctx, txid, true)
if err != nil {
continue // Skip unavailable prevouts
}
mu.Lock()
prevoutCache[txid] = prevTx
mu.Unlock()
}
}()
}

go func() {
defer close(jobs)
for _, txid := range txids {
select {
case <-ctx.Done():
return
case jobs <- txid:
}
}
}()

voutIdx := tx.Vin[i].Vout
if int(voutIdx) < len(prevTx.Vout) {
tx.Vin[i].PrevOut = &prevTx.Vout[voutIdx]
wg.Wait()
if ctx.Err() != nil {
return ctx.Err()
}

// Assign resolved prevouts back to inputs
for _, tx := range txs {
if tx.IsCoinbase() {
continue
}
if len(tx.Vin) > 0 && tx.Vin[0].PrevOut != nil {
continue
}
for i := range tx.Vin {
if tx.Vin[i].TxID == "" {
continue
}
prevTx, ok := prevoutCache[tx.Vin[i].TxID]
if !ok {
continue
}
voutIdx := tx.Vin[i].Vout
if int(voutIdx) < len(prevTx.Vout) {
tx.Vin[i].PrevOut = &prevTx.Vout[voutIdx]
}
}
}

return tx, nil
return nil
}

// GetMempoolEntry returns mempool entry for a specific transaction
Expand Down
10 changes: 9 additions & 1 deletion internal/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,16 @@ func NewBaseClient(
timeout time.Duration,
rl *ratelimiter.PooledRateLimiter,
) *BaseClient {
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20,
IdleConnTimeout: 90 * time.Second,
}
return &BaseClient{
httpClient: &http.Client{Timeout: timeout},
httpClient: &http.Client{
Timeout: timeout,
Transport: transport,
},
baseURL: strings.TrimSuffix(baseURL, "/"),
auth: auth,
network: network,
Expand Down
Loading