From e68c462f833c395d1b452b17c18afdfe84504beb Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Sat, 7 Feb 2026 16:44:08 +0700 Subject: [PATCH 1/4] feat: emit both incoming and outgoing events for monitored addresses For wallet-to-wallet transfers (like sweeps), we need balance updates for BOTH sender and receiver. This requires custody to create transaction records for both wallets, which triggers finalization for both. Changes: - Emit incoming event when TO address is monitored (existing) - Also emit outgoing event when FROM address is monitored (NEW) - Both events processed independently by custody deposit_worker - Results in 2 transaction records + 2 finalization tasks - Finalization_worker updates both balances from on-chain state This fixes sweep balance updates: - Receiver balance increases (from incoming event) - Sender balance decreases (from outgoing event) Note: Withdrawals also emit outgoing events, but custody handles them differently (withdrawal_worker vs deposit_worker). --- .idea/.gitignore | 8 +++++++ .idea/copilot.data.migration.agent.xml | 6 +++++ .idea/copilot.data.migration.ask2agent.xml | 6 +++++ .idea/copilot.data.migration.edit.xml | 6 +++++ .idea/inspectionProfiles/Project_Default.xml | 6 +++++ .idea/multichain-indexer.iml | 4 ++++ .idea/vcs.xml | 6 +++++ internal/worker/base.go | 24 ++++++++++++++++++-- 8 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/copilot.data.migration.agent.xml create mode 100644 .idea/copilot.data.migration.ask2agent.xml create mode 100644 .idea/copilot.data.migration.edit.xml create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/multichain-indexer.iml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/copilot.data.migration.agent.xml b/.idea/copilot.data.migration.agent.xml new file mode 100644 index 0000000..4ea72a9 --- /dev/null +++ b/.idea/copilot.data.migration.agent.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/copilot.data.migration.ask2agent.xml b/.idea/copilot.data.migration.ask2agent.xml new file mode 100644 index 0000000..1f2ea11 --- /dev/null +++ b/.idea/copilot.data.migration.ask2agent.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/copilot.data.migration.edit.xml b/.idea/copilot.data.migration.edit.xml new file mode 100644 index 0000000..8648f94 --- /dev/null +++ b/.idea/copilot.data.migration.edit.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..03d9549 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/multichain-indexer.iml b/.idea/multichain-indexer.iml new file mode 100644 index 0000000..7ee078d --- /dev/null +++ b/.idea/multichain-indexer.iml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/internal/worker/base.go b/internal/worker/base.go index 5047c17..641f8ed 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -165,10 +165,13 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { addressType := bw.chain.GetNetworkType() for _, tx := range block.Transactions { - // Only check if ToAddress is monitored (incoming transfer/deposit) - // Outgoing transactions (FROM monitored addresses) are handled by withdrawal flow + // Check if ToAddress is monitored (incoming transfer/deposit) toMonitored := tx.ToAddress != "" && bw.pubkeyStore.Exist(addressType, tx.ToAddress) + // Check if FromAddress is monitored (outgoing transfer/sweep) + // Note: Withdrawals are also outgoing but initiated differently + fromMonitored := tx.FromAddress != "" && bw.pubkeyStore.Exist(addressType, tx.FromAddress) + // Emit incoming event for receiver if toMonitored { bw.logger.Info("Emitting matched transaction", "direction", "incoming", @@ -182,5 +185,22 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { ) _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) } + + // Emit outgoing event for sender (enables balance decrements for sweeps) + // This ensures custody creates transaction records for BOTH sender and receiver + // which triggers finalization (and balance updates) for both wallets + if fromMonitored { + bw.logger.Info("Emitting matched transaction", + "direction", "outgoing", + "from", tx.FromAddress, + "to", tx.ToAddress, + "chain", bw.chain.GetName(), + "addressType", addressType, + "txhash", tx.TxHash, + "status", tx.Status, + "confirmations", tx.Confirmations, + ) + _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) + } } } From df985753db92f6c79d865be1d1c8a6df3c6008cf Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Sat, 7 Feb 2026 17:31:21 +0700 Subject: [PATCH 2/4] fix: use unique idempotent keys for incoming vs outgoing events Problem: When emitting both incoming and outgoing events for the same transaction, NATS was deduplicating them because they had the same idempotent key (tx hash). This caused custody to only receive one event and create one transaction record, preventing proper balance updates for both sender and receiver. Solution: - Added EmitTransactionWithKey() method to allow custom idempotent keys - Use txHash+'-in' for incoming events - Use txHash+'-out' for outgoing events - This ensures both events are processed by custody, creating two transaction records and triggering finalization for both wallets Fixes balance update issue where deposit wallet balance wasn't being updated after sweeps. --- internal/worker/base.go | 4 ++-- pkg/events/emitter.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/worker/base.go b/internal/worker/base.go index 641f8ed..2c22d66 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -183,7 +183,7 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "status", tx.Status, "confirmations", tx.Confirmations, ) - _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &tx, tx.TxHash+"-in") } // Emit outgoing event for sender (enables balance decrements for sweeps) @@ -200,7 +200,7 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "status", tx.Status, "confirmations", tx.Confirmations, ) - _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &tx, tx.TxHash+"-out") } } } diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go index e997ba5..6c7ce6a 100644 --- a/pkg/events/emitter.go +++ b/pkg/events/emitter.go @@ -21,6 +21,7 @@ type IndexerEvent struct { type Emitter interface { EmitBlock(chain string, block *types.Block) error EmitTransaction(chain string, tx *types.Transaction) error + EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error EmitError(chain string, err error) error Emit(event IndexerEvent) error Close() @@ -44,12 +45,16 @@ func (e *emitter) EmitBlock(chain string, block *types.Block) error { } func (e *emitter) EmitTransaction(chain string, tx *types.Transaction) error { + return e.EmitTransactionWithKey(chain, tx, tx.Hash()) +} + +func (e *emitter) EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error { txBytes, err := tx.MarshalBinary() if err != nil { return err } return e.queue.Enqueue(infra.TransferEventTopicQueue, txBytes, &infra.EnqueueOptions{ - IdempotententKey: tx.Hash(), + IdempotententKey: idempotentKey, }) } From c0cfd25f7842630a7a5c94cc976b110c13556eec Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Sat, 7 Feb 2026 17:44:41 +0700 Subject: [PATCH 3/4] feat: add direction field to Transaction for dual events Problem: When emitting both incoming and outgoing events for sweep transactions, both events had identical transaction data. Custody couldn't differentiate between them, so both events tried to create a transaction record for the same wallet (receiver). Solution: - Add Direction field to Transaction struct ('in' or 'out') - Set Direction='in' for incoming events (toMonitored) - Set Direction='out' for outgoing events (fromMonitored) - Create copy of transaction before setting direction to avoid modifying shared data Now custody can: - Check Direction field to know which wallet owns this event - For 'in': lookup TO address wallet - For 'out': lookup FROM address wallet - Create transaction record with correct wallet_id This enables proper dual-wallet balance updates for sweeps. --- internal/worker/base.go | 8 ++++++-- pkg/common/types/types.go | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/worker/base.go b/internal/worker/base.go index 2c22d66..72be3bf 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -183,7 +183,9 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "status", tx.Status, "confirmations", tx.Confirmations, ) - _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &tx, tx.TxHash+"-in") + txCopy := tx + txCopy.Direction = "in" + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-in") } // Emit outgoing event for sender (enables balance decrements for sweeps) @@ -200,7 +202,9 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "status", tx.Status, "confirmations", tx.Confirmations, ) - _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &tx, tx.TxHash+"-out") + txCopy := tx + txCopy.Direction = "out" + _ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-out") } } } diff --git a/pkg/common/types/types.go b/pkg/common/types/types.go index 00604c7..05067d6 100644 --- a/pkg/common/types/types.go +++ b/pkg/common/types/types.go @@ -32,6 +32,7 @@ type Transaction struct { Timestamp uint64 `json:"timestamp"` Confirmations uint64 `json:"confirmations"` // Number of confirmations (0 = mempool/unconfirmed) Status string `json:"status"` // "pending" (0 conf), "confirmed" (1+ conf) + Direction string `json:"direction,omitempty"` // "in" or "out" - set when emitting dual events for sweep transactions } func (t Transaction) MarshalBinary() ([]byte, error) { From 24bade595843dcd7dfe37178eee4c29a2a3d26f2 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Sun, 8 Feb 2026 17:53:40 +0700 Subject: [PATCH 4/4] chore: update .gitignore to prevent uploading .idea folder --- .gitignore | 4 +++- .idea/.gitignore | 8 -------- .idea/copilot.data.migration.agent.xml | 6 ------ .idea/copilot.data.migration.ask2agent.xml | 6 ------ .idea/copilot.data.migration.edit.xml | 6 ------ .idea/inspectionProfiles/Project_Default.xml | 6 ------ .idea/multichain-indexer.iml | 4 ---- .idea/vcs.xml | 6 ------ 8 files changed, 3 insertions(+), 43 deletions(-) delete mode 100644 .idea/.gitignore delete mode 100644 .idea/copilot.data.migration.agent.xml delete mode 100644 .idea/copilot.data.migration.ask2agent.xml delete mode 100644 .idea/copilot.data.migration.edit.xml delete mode 100644 .idea/inspectionProfiles/Project_Default.xml delete mode 100644 .idea/multichain-indexer.iml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index ce5ad76..c67c343 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ wallet-kv-load # External dependencies sui-apis -sui-apis.backup \ No newline at end of file +sui-apis.backup + +.idea diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/copilot.data.migration.agent.xml b/.idea/copilot.data.migration.agent.xml deleted file mode 100644 index 4ea72a9..0000000 --- a/.idea/copilot.data.migration.agent.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/copilot.data.migration.ask2agent.xml b/.idea/copilot.data.migration.ask2agent.xml deleted file mode 100644 index 1f2ea11..0000000 --- a/.idea/copilot.data.migration.ask2agent.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/copilot.data.migration.edit.xml b/.idea/copilot.data.migration.edit.xml deleted file mode 100644 index 8648f94..0000000 --- a/.idea/copilot.data.migration.edit.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index 03d9549..0000000 --- a/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/multichain-indexer.iml b/.idea/multichain-indexer.iml deleted file mode 100644 index 7ee078d..0000000 --- a/.idea/multichain-indexer.iml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file