Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,17 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from
defer natsConn.Close()
logger.Info("NATS connection established")

transferEventQueueManager := infra.NewNATsMessageQueueManager("transfer", []string{
eventQueueManager := infra.NewNATsMessageQueueManager("transfer", []string{
"transfer.event.*",
}, natsConn)
eventQueue := eventQueueManager.NewMessageQueue("dispatch")

transferQueue := transferEventQueueManager.NewMessageQueue("dispatch")
utxoQueueManager := infra.NewNATsMessageQueueManager("utxo", []string{
"utxo.event.*",
}, natsConn)
utxoQueue := utxoQueueManager.NewMessageQueue("dispatch")

emitter := events.NewEmitter(transferQueue, services.Nats.SubjectPrefix)
emitter := events.NewEmitter(eventQueue, utxoQueue, services.Nats.SubjectPrefix)
defer emitter.Close()

// start address bloom filter (Initialize is optional)
Expand Down
4 changes: 4 additions & 0 deletions configs/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ chains:
poll_interval: "60s" # Bitcoin blocks ~10 minutes
confirmations: 1
reorg_rollback_window: 100
index_change_output: false
index_utxo: true # Enable UTXO event extraction and emission (Bitcoin only)
nodes:
- url: "https://bitcoin-testnet-rpc.publicnode.com"
- url: "https://blockstream.info/testnet/api"
Expand All @@ -89,6 +91,8 @@ chains:
poll_interval: "60s"
confirmations: 1
reorg_rollback_window: 100
index_change_output: false
index_utxo: true # Enable UTXO event extraction and emission (Bitcoin only)
nodes:
- url: "https://bitcoin-rpc.publicnode.com"
- url: "https://blockstream.info/api"
Expand Down
148 changes: 120 additions & 28 deletions internal/indexer/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func (b *BitcoinIndexer) GetBlock(ctx context.Context, number uint64) (*types.Bl
// convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions
func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, btcBlock *bitcoin.Block) (*types.Block, error) {
var allTransfers []types.Transaction
var allUTXOEvents []types.UTXOEvent

// Calculate latest block height from confirmations
latestBlock := btcBlock.Height
if btcBlock.Confirmations > 0 {
latestBlock = btcBlock.Height + btcBlock.Confirmations - 1
}

// Get a client for prevout resolution
provider, _ := b.failover.GetBestProvider()
var btcClient *bitcoin.BitcoinClient
if provider != nil {
Expand All @@ -112,16 +111,13 @@ func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context,
for i := range btcBlock.Tx {
tx := &btcBlock.Tx[i]

// Skip coinbase transactions
if tx.IsCoinbase() {
continue
}

// Try to resolve prevout data for FROM address if not already present
if btcClient != nil && len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil {
if tx.Vin[0].TxID != "" {
if resolved, err := btcClient.GetTransactionWithPrevouts(ctx, tx.TxID); err == nil {
// Copy resolved prevout data
for j := range tx.Vin {
if j < len(resolved.Vin) && resolved.Vin[j].PrevOut != nil {
tx.Vin[j].PrevOut = resolved.Vin[j].PrevOut
Expand All @@ -131,18 +127,28 @@ func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context,
}
}

// Extract all transfers - filtering happens in worker's emitBlock
transfers := b.extractTransfersFromTx(tx, btcBlock.Height, btcBlock.Time, latestBlock)
allTransfers = append(allTransfers, transfers...)

if b.config.IndexUTXO {
utxoEvent := b.extractUTXOEvent(tx, btcBlock.Height, btcBlock.Hash, btcBlock.Time, latestBlock)
if utxoEvent != nil {
allUTXOEvents = append(allUTXOEvents, *utxoEvent)
}
}
}

return &types.Block{
block := &types.Block{
Number: btcBlock.Height,
Hash: btcBlock.Hash,
ParentHash: btcBlock.PreviousBlockHash,
Timestamp: btcBlock.Time,
Transactions: allTransfers,
}, nil
}

block.SetMetadata("utxo_events", allUTXOEvents)

return block, nil
}

func (b *BitcoinIndexer) GetBlocks(
Expand Down Expand Up @@ -250,14 +256,13 @@ func (b *BitcoinIndexer) extractTransfersFromTx(
continue // Skip unspendable outputs (OP_RETURN, etc.)
}

// Normalize Bitcoin address (bech32 -> lowercase, base58 -> validated)
if normalized, err := bitcoin.NormalizeBTCAddress(toAddr); err == nil {
toAddr = normalized
}

// Skip self-transfers (change outputs back to sender)
// This prevents confusing transfers where from=to
if fromAddr != "" && fromAddr == toAddr {
// For Transfer events, respect index_change_output config
// (This filters what goes to transfer.event.dispatch)
if !b.config.IndexChangeOutput && fromAddr != "" && fromAddr == toAddr {
continue
}

Expand Down Expand Up @@ -291,7 +296,93 @@ func (b *BitcoinIndexer) extractTransfersFromTx(
return transfers
}

// getFirstInputAddress returns the address of the first input with prevout data
func (b *BitcoinIndexer) extractUTXOEvent(
tx *bitcoin.Transaction,
blockNumber uint64,
blockHash string,
timestamp, latestBlock uint64,
) *types.UTXOEvent {
if tx.IsCoinbase() {
return nil
}

var created []types.UTXO
var spent []types.SpentUTXO

// Extract ALL created UTXOs (vouts) without filtering
// Filtering happens at emission level based on monitored addresses
for i, vout := range tx.Vout {
addr := bitcoin.GetOutputAddress(&vout)
if addr == "" {
continue
}

if normalized, err := bitcoin.NormalizeBTCAddress(addr); err == nil {
addr = normalized
}

amountSat := int64(vout.Value * 1e8)

created = append(created, types.UTXO{
TxHash: tx.TxID,
Vout: uint32(i),
Address: addr,
Amount: strconv.FormatInt(amountSat, 10),
ScriptPubKey: vout.ScriptPubKey.Hex,
})
}

// Extract ALL spent UTXOs (vins) without filtering
// Filtering happens at emission level based on monitored addresses
for i, vin := range tx.Vin {
if vin.PrevOut == nil {
continue
}

addr := bitcoin.GetInputAddress(&vin)
if addr == "" {
continue
}

if normalized, err := bitcoin.NormalizeBTCAddress(addr); err == nil {
addr = normalized
}

amountSat := int64(vin.PrevOut.Value * 1e8)

spent = append(spent, types.SpentUTXO{
TxHash: vin.TxID,
Vout: vin.Vout,
Vin: uint32(i),
Address: addr,
Amount: strconv.FormatInt(amountSat, 10),
})
}

// Return event even if no monitored addresses
// Filtering happens at emission level
if len(created) == 0 && len(spent) == 0 {
return nil
}

confirmations := b.calculateConfirmations(blockNumber, latestBlock)
status := utils.CalculateTransactionStatus(confirmations, b.confirmations)
fee := tx.CalculateFee()

return &types.UTXOEvent{
TxHash: tx.TxID,
NetworkId: b.config.NetworkId,
BlockNumber: blockNumber,
BlockHash: blockHash,
Timestamp: timestamp,
Created: created,
Spent: spent,
TxFee: fee.String(),
Status: status,
Confirmations: confirmations,
}
}

func (b *BitcoinIndexer) getFirstInputAddress(tx *bitcoin.Transaction) string {
for _, vin := range tx.Vin {
if addr := bitcoin.GetInputAddress(&vin); addr != "" {
Expand Down Expand Up @@ -338,52 +429,53 @@ func (b *BitcoinIndexer) GetConfirmedHeight(ctx context.Context) (uint64, error)
}

// GetMempoolTransactions fetches and processes transactions from the mempool
// Returns transactions involving monitored addresses with 0 confirmations
func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Transaction, error) {
// Get Bitcoin client from failover
// Returns transactions and UTXO events involving monitored addresses with 0 confirmations
func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Transaction, []types.UTXOEvent, error) {
provider, err := b.failover.GetBestProvider()
if err != nil {
return nil, fmt.Errorf("failed to get bitcoin provider: %w", err)
return nil, nil, fmt.Errorf("failed to get bitcoin provider: %w", err)
}

btcClient, ok := provider.Client.(*bitcoin.BitcoinClient)
if !ok {
return nil, fmt.Errorf("invalid client type")
return nil, nil, fmt.Errorf("invalid client type")
}

// Get latest block height for context
latestBlock, err := b.GetLatestBlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latest block: %w", err)
return nil, nil, fmt.Errorf("failed to get latest block: %w", err)
}

// Get mempool transaction IDs
result, err := btcClient.GetRawMempool(ctx, false)
if err != nil {
return nil, fmt.Errorf("failed to get mempool: %w", err)
return nil, nil, fmt.Errorf("failed to get mempool: %w", err)
}

txids, ok := result.([]string)
if !ok {
return nil, fmt.Errorf("unexpected mempool format")
return nil, nil, fmt.Errorf("unexpected mempool format")
}

// Process each transaction
var allTransfers []types.Transaction
var allUTXOEvents []types.UTXOEvent
currentTime := uint64(time.Now().Unix())

for _, txid := range txids {
// Fetch transaction with prevout data resolved
// This is critical for detecting FROM addresses
tx, err := btcClient.GetTransactionWithPrevouts(ctx, txid)
if err != nil {
continue
}

// Extract transfers (blockNumber=0 for mempool, confirmations will be 0)
transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock)
allTransfers = append(allTransfers, transfers...)

if b.config.IndexUTXO {
utxoEvent := b.extractUTXOEvent(tx, 0, "", currentTime, latestBlock)
if utxoEvent != nil {
allUTXOEvents = append(allUTXOEvents, *utxoEvent)
}
}
}

return allTransfers, nil
return allTransfers, allUTXOEvents, nil
}
Loading