diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 08e889e..fdc4a08 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -138,13 +138,17 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from defer natsConn.Close() logger.Info("NATS connection established") - transferEventQueueManager := infra.NewNATsMessageQueueManager("transfer", []string{ + eventQueueManager := infra.NewNATsMessageQueueManager("transfer", []string{ "transfer.event.*", }, natsConn) + eventQueue := eventQueueManager.NewMessageQueue("dispatch") - transferQueue := transferEventQueueManager.NewMessageQueue("dispatch") + utxoQueueManager := infra.NewNATsMessageQueueManager("utxo", []string{ + "utxo.event.*", + }, natsConn) + utxoQueue := utxoQueueManager.NewMessageQueue("dispatch") - emitter := events.NewEmitter(transferQueue, services.Nats.SubjectPrefix) + emitter := events.NewEmitter(eventQueue, utxoQueue, services.Nats.SubjectPrefix) defer emitter.Close() // start address bloom filter (Initialize is optional) diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 453b899..83168a7 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -68,6 +68,8 @@ chains: poll_interval: "60s" # Bitcoin blocks ~10 minutes confirmations: 1 reorg_rollback_window: 100 + index_change_output: false + index_utxo: true # Enable UTXO event extraction and emission (Bitcoin only) nodes: - url: "https://bitcoin-testnet-rpc.publicnode.com" - url: "https://blockstream.info/testnet/api" @@ -89,6 +91,8 @@ chains: poll_interval: "60s" confirmations: 1 reorg_rollback_window: 100 + index_change_output: false + index_utxo: true # Enable UTXO event extraction and emission (Bitcoin only) nodes: - url: "https://bitcoin-rpc.publicnode.com" - url: "https://blockstream.info/api" diff --git a/internal/indexer/bitcoin.go b/internal/indexer/bitcoin.go index 4473a8e..4ceacdf 100644 --- a/internal/indexer/bitcoin.go +++ b/internal/indexer/bitcoin.go @@ -95,14 +95,13 @@ func (b *BitcoinIndexer) GetBlock(ctx context.Context, number uint64) (*types.Bl // convertBlockWithPrevoutResolution converts a block and resolves prevout data for transactions func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, btcBlock *bitcoin.Block) (*types.Block, error) { var allTransfers []types.Transaction + var allUTXOEvents []types.UTXOEvent - // Calculate latest block height from confirmations latestBlock := btcBlock.Height if btcBlock.Confirmations > 0 { latestBlock = btcBlock.Height + btcBlock.Confirmations - 1 } - // Get a client for prevout resolution provider, _ := b.failover.GetBestProvider() var btcClient *bitcoin.BitcoinClient if provider != nil { @@ -112,16 +111,13 @@ func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, for i := range btcBlock.Tx { tx := &btcBlock.Tx[i] - // Skip coinbase transactions if tx.IsCoinbase() { continue } - // Try to resolve prevout data for FROM address if not already present if btcClient != nil && len(tx.Vin) > 0 && tx.Vin[0].PrevOut == nil { if tx.Vin[0].TxID != "" { if resolved, err := btcClient.GetTransactionWithPrevouts(ctx, tx.TxID); err == nil { - // Copy resolved prevout data for j := range tx.Vin { if j < len(resolved.Vin) && resolved.Vin[j].PrevOut != nil { tx.Vin[j].PrevOut = resolved.Vin[j].PrevOut @@ -131,18 +127,28 @@ func (b *BitcoinIndexer) convertBlockWithPrevoutResolution(ctx context.Context, } } - // Extract all transfers - filtering happens in worker's emitBlock transfers := b.extractTransfersFromTx(tx, btcBlock.Height, btcBlock.Time, latestBlock) allTransfers = append(allTransfers, transfers...) + + if b.config.IndexUTXO { + utxoEvent := b.extractUTXOEvent(tx, btcBlock.Height, btcBlock.Hash, btcBlock.Time, latestBlock) + if utxoEvent != nil { + allUTXOEvents = append(allUTXOEvents, *utxoEvent) + } + } } - return &types.Block{ + block := &types.Block{ Number: btcBlock.Height, Hash: btcBlock.Hash, ParentHash: btcBlock.PreviousBlockHash, Timestamp: btcBlock.Time, Transactions: allTransfers, - }, nil + } + + block.SetMetadata("utxo_events", allUTXOEvents) + + return block, nil } func (b *BitcoinIndexer) GetBlocks( @@ -250,14 +256,13 @@ func (b *BitcoinIndexer) extractTransfersFromTx( continue // Skip unspendable outputs (OP_RETURN, etc.) } - // Normalize Bitcoin address (bech32 -> lowercase, base58 -> validated) if normalized, err := bitcoin.NormalizeBTCAddress(toAddr); err == nil { toAddr = normalized } - // Skip self-transfers (change outputs back to sender) - // This prevents confusing transfers where from=to - if fromAddr != "" && fromAddr == toAddr { + // For Transfer events, respect index_change_output config + // (This filters what goes to transfer.event.dispatch) + if !b.config.IndexChangeOutput && fromAddr != "" && fromAddr == toAddr { continue } @@ -291,7 +296,93 @@ func (b *BitcoinIndexer) extractTransfersFromTx( return transfers } -// getFirstInputAddress returns the address of the first input with prevout data +func (b *BitcoinIndexer) extractUTXOEvent( + tx *bitcoin.Transaction, + blockNumber uint64, + blockHash string, + timestamp, latestBlock uint64, +) *types.UTXOEvent { + if tx.IsCoinbase() { + return nil + } + + var created []types.UTXO + var spent []types.SpentUTXO + + // Extract ALL created UTXOs (vouts) without filtering + // Filtering happens at emission level based on monitored addresses + for i, vout := range tx.Vout { + addr := bitcoin.GetOutputAddress(&vout) + if addr == "" { + continue + } + + if normalized, err := bitcoin.NormalizeBTCAddress(addr); err == nil { + addr = normalized + } + + amountSat := int64(vout.Value * 1e8) + + created = append(created, types.UTXO{ + TxHash: tx.TxID, + Vout: uint32(i), + Address: addr, + Amount: strconv.FormatInt(amountSat, 10), + ScriptPubKey: vout.ScriptPubKey.Hex, + }) + } + + // Extract ALL spent UTXOs (vins) without filtering + // Filtering happens at emission level based on monitored addresses + for i, vin := range tx.Vin { + if vin.PrevOut == nil { + continue + } + + addr := bitcoin.GetInputAddress(&vin) + if addr == "" { + continue + } + + if normalized, err := bitcoin.NormalizeBTCAddress(addr); err == nil { + addr = normalized + } + + amountSat := int64(vin.PrevOut.Value * 1e8) + + spent = append(spent, types.SpentUTXO{ + TxHash: vin.TxID, + Vout: vin.Vout, + Vin: uint32(i), + Address: addr, + Amount: strconv.FormatInt(amountSat, 10), + }) + } + + // Return event even if no monitored addresses + // Filtering happens at emission level + if len(created) == 0 && len(spent) == 0 { + return nil + } + + confirmations := b.calculateConfirmations(blockNumber, latestBlock) + status := utils.CalculateTransactionStatus(confirmations, b.confirmations) + fee := tx.CalculateFee() + + return &types.UTXOEvent{ + TxHash: tx.TxID, + NetworkId: b.config.NetworkId, + BlockNumber: blockNumber, + BlockHash: blockHash, + Timestamp: timestamp, + Created: created, + Spent: spent, + TxFee: fee.String(), + Status: status, + Confirmations: confirmations, + } +} + func (b *BitcoinIndexer) getFirstInputAddress(tx *bitcoin.Transaction) string { for _, vin := range tx.Vin { if addr := bitcoin.GetInputAddress(&vin); addr != "" { @@ -338,52 +429,53 @@ func (b *BitcoinIndexer) GetConfirmedHeight(ctx context.Context) (uint64, error) } // GetMempoolTransactions fetches and processes transactions from the mempool -// Returns transactions involving monitored addresses with 0 confirmations -func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Transaction, error) { - // Get Bitcoin client from failover +// Returns transactions and UTXO events involving monitored addresses with 0 confirmations +func (b *BitcoinIndexer) GetMempoolTransactions(ctx context.Context) ([]types.Transaction, []types.UTXOEvent, error) { provider, err := b.failover.GetBestProvider() if err != nil { - return nil, fmt.Errorf("failed to get bitcoin provider: %w", err) + return nil, nil, fmt.Errorf("failed to get bitcoin provider: %w", err) } btcClient, ok := provider.Client.(*bitcoin.BitcoinClient) if !ok { - return nil, fmt.Errorf("invalid client type") + return nil, nil, fmt.Errorf("invalid client type") } - // Get latest block height for context latestBlock, err := b.GetLatestBlockNumber(ctx) if err != nil { - return nil, fmt.Errorf("failed to get latest block: %w", err) + return nil, nil, fmt.Errorf("failed to get latest block: %w", err) } - // Get mempool transaction IDs result, err := btcClient.GetRawMempool(ctx, false) if err != nil { - return nil, fmt.Errorf("failed to get mempool: %w", err) + return nil, nil, fmt.Errorf("failed to get mempool: %w", err) } txids, ok := result.([]string) if !ok { - return nil, fmt.Errorf("unexpected mempool format") + return nil, nil, fmt.Errorf("unexpected mempool format") } - // Process each transaction var allTransfers []types.Transaction + var allUTXOEvents []types.UTXOEvent currentTime := uint64(time.Now().Unix()) for _, txid := range txids { - // Fetch transaction with prevout data resolved - // This is critical for detecting FROM addresses tx, err := btcClient.GetTransactionWithPrevouts(ctx, txid) if err != nil { continue } - // Extract transfers (blockNumber=0 for mempool, confirmations will be 0) transfers := b.extractTransfersFromTx(tx, 0, currentTime, latestBlock) allTransfers = append(allTransfers, transfers...) + + if b.config.IndexUTXO { + utxoEvent := b.extractUTXOEvent(tx, 0, "", currentTime, latestBlock) + if utxoEvent != nil { + allUTXOEvents = append(allUTXOEvents, *utxoEvent) + } + } } - return allTransfers, nil + return allTransfers, allUTXOEvents, nil } diff --git a/internal/worker/base.go b/internal/worker/base.go index 5047c17..5944528 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -18,6 +18,9 @@ import ( "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) +// maxBlockCacheSize is the maximum number of blocks to cache for orphan event emission during reorgs +const maxBlockCacheSize = 100 + // BaseWorker holds the common state and logic shared by all worker types. type BaseWorker struct { ctx context.Context @@ -32,6 +35,9 @@ type BaseWorker struct { pubkeyStore pubkeystore.Store emitter events.Emitter failedChan chan FailedBlockEvent + + // blockCache stores recent blocks for orphan event emission during reorgs + blockCache map[uint64]*types.Block } // Stop stops the worker and cleans up internal resources @@ -70,6 +76,7 @@ func newWorkerWithMode( pubkeyStore: pubkeyStore, emitter: emitter, failedChan: failedChan, + blockCache: make(map[uint64]*types.Block), } } @@ -183,4 +190,148 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) } } + + bw.emitUTXOs(block) +} + +// emitUTXOs emits UTXO events for monitored addresses. +func (bw *BaseWorker) emitUTXOs(block *types.Block) { + if block == nil || bw.pubkeyStore == nil { + return + } + + if !bw.config.IndexUTXO { + return + } + + utxoEvents, ok := block.GetMetadata("utxo_events") + if !ok { + return + } + + events, ok := utxoEvents.([]types.UTXOEvent) + if !ok { + return + } + + addressType := bw.chain.GetNetworkType() + for i := range events { + event := &events[i] + isRelevant := false + + for _, utxo := range event.Created { + if bw.pubkeyStore.Exist(addressType, utxo.Address) { + isRelevant = true + break + } + } + + if !isRelevant { + for _, spent := range event.Spent { + if bw.pubkeyStore.Exist(addressType, spent.Address) { + isRelevant = true + break + } + } + } + + if isRelevant { + bw.logger.Info("Emitting UTXO event", + "chain", bw.chain.GetName(), + "txhash", event.TxHash, + "created", len(event.Created), + "spent", len(event.Spent), + "status", event.Status, + "confirmations", event.Confirmations, + ) + _ = bw.emitter.EmitUTXO(bw.chain.GetName(), event) + } + } +} + +// CacheBlock adds a block to the cache, maintaining size limit. +// Should be called by all workers after successfully processing a block. +func (bw *BaseWorker) CacheBlock(block *types.Block) { + if block == nil { + return + } + + bw.blockCache[block.Number] = block + + // Clean up old entries if cache exceeds max size + if uint64(len(bw.blockCache)) > maxBlockCacheSize { + var minBlock uint64 = ^uint64(0) // Max uint64 + for num := range bw.blockCache { + if num < minBlock { + minBlock = num + } + } + if minBlock != ^uint64(0) { + delete(bw.blockCache, minBlock) + } + } +} + +// ClearBlockCache clears all cached blocks. +// Should be called after a reorg is handled. +func (bw *BaseWorker) ClearBlockCache() { + for k := range bw.blockCache { + delete(bw.blockCache, k) + } +} + +// EmitOrphanUTXOs emits UTXO events with orphaned status for blocks in the rollback range. +// This allows consumers to rollback their UTXO state. +func (bw *BaseWorker) EmitOrphanUTXOs(startBlock, endBlock uint64) { + if !bw.config.IndexUTXO { + return + } + + for blockNum := startBlock; blockNum <= endBlock; blockNum++ { + block, ok := bw.blockCache[blockNum] + if !ok { + bw.logger.Debug("Block not in cache, skipping orphan emission", + "chain", bw.chain.GetName(), + "block", blockNum, + ) + continue + } + + utxoEvents, ok := block.GetMetadata("utxo_events") + if !ok { + continue + } + + events, ok := utxoEvents.([]types.UTXOEvent) + if !ok { + bw.logger.Error("Invalid UTXO events type in block metadata", + "chain", bw.chain.GetName(), + "block", blockNum, + ) + continue + } + + for i := range events { + event := &events[i] + event.Status = types.StatusOrphaned + event.Confirmations = 0 + + bw.logger.Info("Emitting orphaned UTXO event", + "chain", bw.chain.GetName(), + "block", blockNum, + "txHash", event.TxHash, + "created", len(event.Created), + "spent", len(event.Spent), + ) + + if err := bw.emitter.EmitUTXO(bw.chain.GetName(), event); err != nil { + bw.logger.Error("Failed to emit orphaned UTXO event", + "chain", bw.chain.GetName(), + "block", blockNum, + "txHash", event.TxHash, + "err", err, + ) + } + } + } } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index d5421d6..ad20a4f 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -264,6 +264,7 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e } if cw.handleBlockResult(res) && res.Number > batchSuccess { batchSuccess = res.Number + cw.CacheBlock(res.Block) } } diff --git a/internal/worker/manual.go b/internal/worker/manual.go index e7a443c..6a70d36 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -140,6 +140,7 @@ func (mw *ManualWorker) handleRange(ctx context.Context, start, end uint64) { for _, res := range results { if mw.handleBlockResult(res) { lastSuccess = res.Number + mw.CacheBlock(res.Block) } } diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 587abe0..c4e177f 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -84,36 +84,30 @@ func (mw *MempoolWorker) Stop() { func (mw *MempoolWorker) processMempool() error { mw.logger.Debug("Polling mempool", "chain", mw.chain.GetName()) - // Get mempool transactions (already filtered by monitored addresses in indexer) - transactions, err := mw.btcIndexer.GetMempoolTransactions(mw.ctx) + transactions, utxoEvents, err := mw.btcIndexer.GetMempoolTransactions(mw.ctx) if err != nil { mw.logger.Error("Failed to get mempool transactions", "err", err) return err } - // Track and emit new transactions (only TO monitored addresses - deposits) newTxCount := 0 + newUTXOCount := 0 networkType := mw.chain.GetNetworkType() for _, tx := range transactions { - // Only emit transactions where TO address is monitored (incoming deposits) - // Outgoing transactions are handled by the withdrawal flow toMonitored := tx.ToAddress != "" && mw.pubkeyStore.Exist(networkType, tx.ToAddress) if !toMonitored { continue } - // Create unique key for deduplication (txHash + toAddress for UTXO model) txKey := tx.TxHash + ":" + tx.ToAddress if mw.seenTxs[txKey] { continue } - // Mark as seen mw.seenTxs[txKey] = true newTxCount++ - // Emit transaction to NATS if err := mw.emitter.EmitTransaction(mw.chain.GetName(), &tx); err != nil { mw.logger.Error("Failed to emit mempool transaction", "txHash", tx.TxHash, @@ -132,9 +126,56 @@ func (mw *MempoolWorker) processMempool() error { } } - if newTxCount > 0 { + if mw.config.IndexUTXO { + for i := range utxoEvents { + event := &utxoEvents[i] + + if mw.seenTxs[event.TxHash+":utxo"] { + continue + } + + isRelevant := false + for _, utxo := range event.Created { + if mw.pubkeyStore.Exist(networkType, utxo.Address) { + isRelevant = true + break + } + } + + if !isRelevant { + for _, spent := range event.Spent { + if mw.pubkeyStore.Exist(networkType, spent.Address) { + isRelevant = true + break + } + } + } + + if isRelevant { + mw.seenTxs[event.TxHash+":utxo"] = true + newUTXOCount++ + + if err := mw.emitter.EmitUTXO(mw.chain.GetName(), event); err != nil { + mw.logger.Error("Failed to emit mempool UTXO", + "txHash", event.TxHash, + "err", err, + ) + } else { + mw.logger.Debug("Emitted mempool UTXO", + "txHash", event.TxHash, + "created", len(event.Created), + "spent", len(event.Spent), + "status", event.Status, + ) + } + } + } + } + + if newTxCount > 0 || newUTXOCount > 0 { mw.logger.Info("Processed mempool transactions", "new_txs", newTxCount, + "new_utxos", newUTXOCount, "total_tracked", len(mw.seenTxs), ) } diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 760a446..9f08c8a 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -72,6 +72,7 @@ func (rw *RegularWorker) Stop() { _ = rw.blockStore.SaveLatestBlock(rw.chain.GetNetworkInternalCode(), rw.currentBlock) } rw.clearBlockHashes() + rw.ClearBlockCache() // Call base worker stop to cancel context and clean up rw.BaseWorker.Stop() } @@ -169,6 +170,7 @@ func (rw *RegularWorker) processRegularBlocks() error { if rw.handleBlockResult(res) { lastSuccess = res.Number lastSuccessHash = res.Block.Hash + rw.CacheBlock(res.Block) } } @@ -267,8 +269,12 @@ func (rw *RegularWorker) detectAndHandleReorg(res *indexer.BlockResult) (bool, e "rollback_end", prevNum, ) - // Clear all block hashes on reorg + // Emit orphan UTXO events for blocks being rolled back + rw.EmitOrphanUTXOs(reorgStart, prevNum) + + // Clear all block hashes and cache on reorg rw.clearBlockHashes() + rw.ClearBlockCache() if err := rw.blockStore.SaveLatestBlock(rw.chain.GetNetworkInternalCode(), reorgStart-1); err != nil { return true, fmt.Errorf("save latest block: %w", err) diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index f829390..a81044e 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -205,6 +205,7 @@ func (rw *RescannerWorker) processBatch(blocks []uint64) error { if rw.handleBlockResult(res) { success++ toRemove = append(toRemove, res.Number) + rw.CacheBlock(res.Block) } else { rw.incrementRetry(res.Number) } diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index c5bf40a..bbf3492 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -43,6 +43,8 @@ type ChainConfig struct { PollInterval time.Duration `yaml:"poll_interval"` ReorgRollbackWindow int `yaml:"reorg_rollback_window"` Confirmations uint64 `yaml:"confirmations"` + IndexChangeOutput bool `yaml:"index_change_output"` + IndexUTXO bool `yaml:"index_utxo"` Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Nodes []NodeConfig `yaml:"nodes" validate:"required,min=1"` diff --git a/pkg/common/types/types.go b/pkg/common/types/types.go index 00604c7..a309d16 100644 --- a/pkg/common/types/types.go +++ b/pkg/common/types/types.go @@ -12,11 +12,27 @@ import ( ) type Block struct { - Number uint64 `json:"number"` - Hash string `json:"hash"` - ParentHash string `json:"parent_hash"` - Timestamp uint64 `json:"timestamp"` - Transactions []Transaction `json:"transactions"` + Number uint64 `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parent_hash"` + Timestamp uint64 `json:"timestamp"` + Transactions []Transaction `json:"transactions"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func (b *Block) SetMetadata(key string, value interface{}) { + if b.Metadata == nil { + b.Metadata = make(map[string]interface{}) + } + b.Metadata[key] = value +} + +func (b *Block) GetMetadata(key string) (interface{}, bool) { + if b.Metadata == nil { + return nil, false + } + val, ok := b.Metadata[key] + return val, ok } type Transaction struct { diff --git a/pkg/common/types/utxo.go b/pkg/common/types/utxo.go new file mode 100644 index 0000000..b06f257 --- /dev/null +++ b/pkg/common/types/utxo.go @@ -0,0 +1,64 @@ +package types + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + "strings" +) + +type UTXOEvent struct { + TxHash string `json:"txHash"` + NetworkId string `json:"networkId"` + BlockNumber uint64 `json:"blockNumber"` + BlockHash string `json:"blockHash"` + Timestamp uint64 `json:"timestamp"` + Created []UTXO `json:"created"` + Spent []SpentUTXO `json:"spent"` + TxFee string `json:"txFee"` + Status string `json:"status"` + Confirmations uint64 `json:"confirmations"` +} + +type UTXO struct { + TxHash string `json:"txHash"` + Vout uint32 `json:"vout"` + Address string `json:"address"` + Amount string `json:"amount"` + ScriptPubKey string `json:"scriptPubKey"` +} + +type SpentUTXO struct { + TxHash string `json:"txHash"` + Vout uint32 `json:"vout"` + Vin uint32 `json:"vin"` + Address string `json:"address"` + Amount string `json:"amount"` +} + +func (u UTXOEvent) MarshalBinary() ([]byte, error) { + return json.Marshal(u) +} + +func (u *UTXOEvent) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, &u) +} + +func (u UTXOEvent) Hash() string { + var builder strings.Builder + builder.WriteString(u.NetworkId) + builder.WriteByte('|') + builder.WriteString(u.TxHash) + builder.WriteByte('|') + builder.WriteString(u.BlockHash) + hash := sha256.Sum256([]byte(builder.String())) + return fmt.Sprintf("%x", hash) +} + +func (u UTXO) Key() string { + return fmt.Sprintf("%s:%d", u.TxHash, u.Vout) +} + +func (s SpentUTXO) Key() string { + return fmt.Sprintf("%s:%d", s.TxHash, s.Vout) +} diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go index e997ba5..954bfb1 100644 --- a/pkg/events/emitter.go +++ b/pkg/events/emitter.go @@ -21,6 +21,7 @@ type IndexerEvent struct { type Emitter interface { EmitBlock(chain string, block *types.Block) error EmitTransaction(chain string, tx *types.Transaction) error + EmitUTXO(chain string, utxo *types.UTXOEvent) error EmitError(chain string, err error) error Emit(event IndexerEvent) error Close() @@ -28,12 +29,14 @@ type Emitter interface { type emitter struct { queue infra.MessageQueue + utxoQueue infra.MessageQueue subjectPrefix string } -func NewEmitter(queue infra.MessageQueue, subjectPrefix string) Emitter { +func NewEmitter(queue infra.MessageQueue, utxoQueue infra.MessageQueue, subjectPrefix string) Emitter { return &emitter{ queue: queue, + utxoQueue: utxoQueue, subjectPrefix: subjectPrefix, } } @@ -53,6 +56,16 @@ func (e *emitter) EmitTransaction(chain string, tx *types.Transaction) error { }) } +func (e *emitter) EmitUTXO(chain string, utxo *types.UTXOEvent) error { + utxoBytes, err := utxo.MarshalBinary() + if err != nil { + return err + } + return e.utxoQueue.Enqueue(infra.UTXOEventTopicQueue, utxoBytes, &infra.EnqueueOptions{ + IdempotententKey: utxo.Hash(), + }) +} + func (e *emitter) EmitError(chain string, err error) error { // TODO: implement return nil @@ -71,4 +84,7 @@ func (e *emitter) Close() { if e.queue != nil { e.queue.Close() } + if e.utxoQueue != nil { + e.utxoQueue.Close() + } } diff --git a/pkg/infra/message_queue.go b/pkg/infra/message_queue.go index 38fee48..72f8efa 100644 --- a/pkg/infra/message_queue.go +++ b/pkg/infra/message_queue.go @@ -13,6 +13,7 @@ import ( const ( TransferEventTopicQueue = "transfer.event.dispatch" + UTXOEventTopicQueue = "utxo.event.dispatch" ) var (