diff --git a/.gitignore b/.gitignore index ce5ad76..c67c343 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ wallet-kv-load # External dependencies sui-apis -sui-apis.backup \ No newline at end of file +sui-apis.backup + +.idea diff --git a/internal/worker/base.go b/internal/worker/base.go index 5047c17..72be3bf 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -165,10 +165,13 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { addressType := bw.chain.GetNetworkType() for _, tx := range block.Transactions { - // Only check if ToAddress is monitored (incoming transfer/deposit) - // Outgoing transactions (FROM monitored addresses) are handled by withdrawal flow + // Check if ToAddress is monitored (incoming transfer/deposit) toMonitored := tx.ToAddress != "" && bw.pubkeyStore.Exist(addressType, tx.ToAddress) + // Check if FromAddress is monitored (outgoing transfer/sweep) + // Note: Withdrawals are also outgoing but initiated differently + fromMonitored := tx.FromAddress != "" && bw.pubkeyStore.Exist(addressType, tx.FromAddress) + // Emit incoming event for receiver if toMonitored { bw.logger.Info("Emitting matched transaction", "direction", "incoming", @@ -180,7 +183,28 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "status", tx.Status, "confirmations", tx.Confirmations, ) - _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) + txCopy := tx + txCopy.Direction = "in" + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-in") + } + + // Emit outgoing event for sender (enables balance decrements for sweeps) + // This ensures custody creates transaction records for BOTH sender and receiver + // which triggers finalization (and balance updates) for both wallets + if fromMonitored { + bw.logger.Info("Emitting matched transaction", + "direction", "outgoing", + "from", tx.FromAddress, + "to", tx.ToAddress, + "chain", bw.chain.GetName(), + "addressType", addressType, + "txhash", tx.TxHash, + "status", tx.Status, + "confirmations", tx.Confirmations, + ) + txCopy := tx + txCopy.Direction = "out" + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-out") } } } diff --git a/pkg/common/types/types.go b/pkg/common/types/types.go index 00604c7..05067d6 100644 --- a/pkg/common/types/types.go +++ b/pkg/common/types/types.go @@ -32,6 +32,7 @@ type Transaction struct { Timestamp uint64 `json:"timestamp"` Confirmations uint64 `json:"confirmations"` // Number of confirmations (0 = mempool/unconfirmed) Status string `json:"status"` // "pending" (0 conf), "confirmed" (1+ conf) + Direction string `json:"direction,omitempty"` // "in" or "out" - set when emitting dual events for sweep transactions } func (t Transaction) MarshalBinary() ([]byte, error) { diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go index e997ba5..6c7ce6a 100644 --- a/pkg/events/emitter.go +++ b/pkg/events/emitter.go @@ -21,6 +21,7 @@ type IndexerEvent struct { type Emitter interface { EmitBlock(chain string, block *types.Block) error EmitTransaction(chain string, tx *types.Transaction) error + EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error EmitError(chain string, err error) error Emit(event IndexerEvent) error Close() @@ -44,12 +45,16 @@ func (e *emitter) EmitBlock(chain string, block *types.Block) error { } func (e *emitter) EmitTransaction(chain string, tx *types.Transaction) error { + return e.EmitTransactionWithKey(chain, tx, tx.Hash()) +} + +func (e *emitter) EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error { txBytes, err := tx.MarshalBinary() if err != nil { return err } return e.queue.Enqueue(infra.TransferEventTopicQueue, txBytes, &infra.EnqueueOptions{ - IdempotententKey: tx.Hash(), + IdempotententKey: idempotentKey, }) }