Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ wallet-kv-load

# External dependencies
sui-apis
sui-apis.backup
sui-apis.backup

.idea
30 changes: 27 additions & 3 deletions internal/worker/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,13 @@ func (bw *BaseWorker) emitBlock(block *types.Block) {

addressType := bw.chain.GetNetworkType()
for _, tx := range block.Transactions {
// Only check if ToAddress is monitored (incoming transfer/deposit)
// Outgoing transactions (FROM monitored addresses) are handled by withdrawal flow
// Check if ToAddress is monitored (incoming transfer/deposit)
toMonitored := tx.ToAddress != "" && bw.pubkeyStore.Exist(addressType, tx.ToAddress)
// Check if FromAddress is monitored (outgoing transfer/sweep)
// Note: Withdrawals are also outgoing but initiated differently
fromMonitored := tx.FromAddress != "" && bw.pubkeyStore.Exist(addressType, tx.FromAddress)

// Emit incoming event for receiver
if toMonitored {
bw.logger.Info("Emitting matched transaction",
"direction", "incoming",
Expand All @@ -180,7 +183,28 @@ func (bw *BaseWorker) emitBlock(block *types.Block) {
"status", tx.Status,
"confirmations", tx.Confirmations,
)
_ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx)
txCopy := tx
txCopy.Direction = "in"
_ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-in")
}

// Emit outgoing event for sender (enables balance decrements for sweeps)
// This ensures custody creates transaction records for BOTH sender and receiver
// which triggers finalization (and balance updates) for both wallets
if fromMonitored {
bw.logger.Info("Emitting matched transaction",
"direction", "outgoing",
"from", tx.FromAddress,
"to", tx.ToAddress,
"chain", bw.chain.GetName(),
"addressType", addressType,
"txhash", tx.TxHash,
"status", tx.Status,
"confirmations", tx.Confirmations,
)
txCopy := tx
txCopy.Direction = "out"
_ = bw.emitter.EmitTransactionWithKey(bw.chain.GetName(), &txCopy, tx.TxHash+"-out")
}
}
}
1 change: 1 addition & 0 deletions pkg/common/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Transaction struct {
Timestamp uint64 `json:"timestamp"`
Confirmations uint64 `json:"confirmations"` // Number of confirmations (0 = mempool/unconfirmed)
Status string `json:"status"` // "pending" (0 conf), "confirmed" (1+ conf)
Direction string `json:"direction,omitempty"` // "in" or "out" - set when emitting dual events for sweep transactions
}

func (t Transaction) MarshalBinary() ([]byte, error) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type IndexerEvent struct {
type Emitter interface {
EmitBlock(chain string, block *types.Block) error
EmitTransaction(chain string, tx *types.Transaction) error
EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error
EmitError(chain string, err error) error
Emit(event IndexerEvent) error
Close()
Expand All @@ -44,12 +45,16 @@ func (e *emitter) EmitBlock(chain string, block *types.Block) error {
}

func (e *emitter) EmitTransaction(chain string, tx *types.Transaction) error {
return e.EmitTransactionWithKey(chain, tx, tx.Hash())
}

func (e *emitter) EmitTransactionWithKey(chain string, tx *types.Transaction, idempotentKey string) error {
txBytes, err := tx.MarshalBinary()
if err != nil {
return err
}
return e.queue.Enqueue(infra.TransferEventTopicQueue, txBytes, &infra.EnqueueOptions{
IdempotententKey: tx.Hash(),
IdempotententKey: idempotentKey,
})
}

Expand Down