From b3a2469c47fc9d38b9801830eb4822bec7f8374c Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 22 May 2026 17:01:45 -0700 Subject: [PATCH 1/3] feat(eth-indexer): periodic stale-refresh sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The live WS subscription only learns about wallets that move AUDIO. To recover from drift, missed events during disconnects, and multi-wallet backfill placeholders (where user_balances.associated_wallets_balance couldn't be decomposed per-wallet), add a background sweep that re-reads the K oldest rows in eth_wallet_balances by updated_at every N seconds, calls totalAudioBalance, and upserts. Reuses the existing fan-out: extracts processLogs's "fan-out totalAudioBalance + upsert" tail into refreshAddresses(ctx, addrs, blockByAddr) and calls it from both the event path and the new sweep. blocknumber handling: - Event path (block > 0): GREATEST(existing, new). Already worked. - Stale-refresh path (no block): preserve existing. Pass NULL via NULLIF(0) so we don't write 0 over a real block. Smoke-tested: initial insert with block 12345, then NULL update keeps 12345, then a lower block (100) also keeps 12345. Config (defaults give a ~22-day full sweep over 3.15M wallets): ethStaleRefreshIntervalSecs default 30 ethStaleRefreshBatchSize default 50 Sustained at defaults: ~1.7 wallets/sec, ~5 RPC/sec total (each wallet runs balanceOf + totalStakedFor + getTotalDelegatorStake in parallel via the existing errgroup path). Well under any Alchemy tier ceiling. Bounded by design — the ticker drops a tick if the previous run is still in flight, so a slow upstream can't pile up work. Panic-safe via deferred recover so an unexpected error in the sweep won't crash the pod and take the WS subscription down with it. Smoke-tested locally: pre-seeded two rows with updated_at='1970-01-01', ran with ethStaleRefreshIntervalSecs=5 against the live Alchemy endpoint. Both rows refreshed on the first tick (balance read and upserted, updated_at advanced), and the sweep continued ticking every 5s without errors. --- eth/indexer/eth_indexer.go | 183 ++++++++++++++++++++++++++++++++----- 1 file changed, 158 insertions(+), 25 deletions(-) diff --git a/eth/indexer/eth_indexer.go b/eth/indexer/eth_indexer.go index e33d8b26..555baac9 100644 --- a/eth/indexer/eth_indexer.go +++ b/eth/indexer/eth_indexer.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math/big" + "os" + "strconv" "sync/atomic" "time" @@ -50,6 +52,20 @@ const ( reconnectMaxBackoff = 60 * time.Second ) +// Stale-refresh defaults. The sweep complements the live WS subscription: +// it picks the K oldest rows in eth_wallet_balances by updated_at, re-reads +// their balances, and upserts. This recovers from drift, missed events +// during disconnects, and multi-wallet user backfills where we couldn't +// decompose user_balances.associated_wallets_balance per-wallet. +// +// Default cadence: 50 wallets / 30s ≈ 1.7 wallets/sec ≈ 144K/day. With +// ~3.15M tracked wallets a full sweep takes ~22 days. Tune via the env +// vars below. +const ( + ethStaleRefreshDefaultInterval = 30 * time.Second + ethStaleRefreshDefaultBatchSize = 50 +) + type EthIndexer struct { config config.Config pool database.DbPool @@ -63,12 +79,26 @@ type EthIndexer struct { httpClient *ethclient.Client + staleRefreshInterval time.Duration + staleRefreshBatchSize int + // State surfaced via /eth/health connected atomic.Bool lastBlockSeen atomic.Uint64 lastEventAt atomic.Pointer[time.Time] } +// envIntDefault reads an env var as int, falling back to def on missing, +// empty, or unparseable values. +func envIntDefault(name string, def int) int { + if s := os.Getenv(name); s != "" { + if n, err := strconv.Atoi(s); err == nil && n > 0 { + return n + } + } + return def +} + func New(cfg config.Config) *EthIndexer { logger := logging.NewZapLogger(cfg).Named("EthIndexer") @@ -82,15 +112,17 @@ func New(cfg config.Config) *EthIndexer { } return &EthIndexer{ - config: cfg, - pool: pool, - logger: logger, - httpURL: cfg.EthRpcUrl, - wsURL: cfg.EthWsUrl, - audioContract: common.HexToAddress(cfg.EthAudioContractAddress), - stakingContract: common.HexToAddress(cfg.EthStakingContractAddress), - delegateManager: common.HexToAddress(cfg.EthDelegateManagerContractAddress), - transferTopic: crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")), + config: cfg, + pool: pool, + logger: logger, + httpURL: cfg.EthRpcUrl, + wsURL: cfg.EthWsUrl, + audioContract: common.HexToAddress(cfg.EthAudioContractAddress), + stakingContract: common.HexToAddress(cfg.EthStakingContractAddress), + delegateManager: common.HexToAddress(cfg.EthDelegateManagerContractAddress), + transferTopic: crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")), + staleRefreshInterval: time.Duration(envIntDefault("ethStaleRefreshIntervalSecs", int(ethStaleRefreshDefaultInterval.Seconds()))) * time.Second, + staleRefreshBatchSize: envIntDefault("ethStaleRefreshBatchSize", ethStaleRefreshDefaultBatchSize), } } @@ -111,6 +143,8 @@ func (e *EthIndexer) Start(ctx context.Context) error { e.httpClient = httpClient defer httpClient.Close() + go e.ScheduleStaleRefresh(ctx) + e.runSubscriptionLoop(ctx) return nil } @@ -285,12 +319,28 @@ func (e *EthIndexer) processLogs(ctx context.Context, logs []types.Log) { return } - // Fan out balanceOf calls. - jobs := make(chan common.Address, len(tracked)) - results := make(chan balanceUpdate, len(tracked)) + updated := e.refreshAddresses(ctx, tracked, candidates) + if updated > 0 { + e.logger.Info("refreshed balances from events", zap.Int("updated", updated)) + } +} + +// refreshAddresses fans out totalAudioBalance for each address (up to +// balanceFetchWorkers in flight at once) and upserts the results in one +// batch. blockByAddr is optional per-address block context: for live +// Transfer events it's the block the event was mined in; for stale-refresh +// sweeps it's nil so the existing blocknumber column is preserved. Returns +// the number of addresses that were actually upserted (omitting failures). +func (e *EthIndexer) refreshAddresses(ctx context.Context, addrs []common.Address, blockByAddr map[common.Address]uint64) int { + if len(addrs) == 0 { + return 0 + } + + jobs := make(chan common.Address, len(addrs)) + results := make(chan balanceUpdate, len(addrs)) workers := balanceFetchWorkers - if workers > len(tracked) { - workers = len(tracked) + if workers > len(addrs) { + workers = len(addrs) } for w := 0; w < workers; w++ { go func() { @@ -304,20 +354,24 @@ func (e *EthIndexer) processLogs(ctx context.Context, logs []types.Log) { results <- balanceUpdate{} // sentinel so receiver count matches continue } - results <- balanceUpdate{addr: addr, bal: bal, block: candidates[addr]} + block := uint64(0) + if blockByAddr != nil { + block = blockByAddr[addr] + } + results <- balanceUpdate{addr: addr, bal: bal, block: block} } }() } - for _, addr := range tracked { + for _, addr := range addrs { jobs <- addr } close(jobs) - updates := make([]balanceUpdate, 0, len(tracked)) - for i := 0; i < len(tracked); i++ { + updates := make([]balanceUpdate, 0, len(addrs)) + for i := 0; i < len(addrs); i++ { select { case <-ctx.Done(): - return + return len(updates) case r := <-results: if r.bal == nil { continue @@ -327,11 +381,9 @@ func (e *EthIndexer) processLogs(ctx context.Context, logs []types.Log) { } if err := e.upsertBalanceUpdates(ctx, updates); err != nil { e.logger.Error("failed to upsert balances", zap.Error(err)) - } else if len(updates) > 0 { - e.logger.Info("refreshed balances from events", - zap.Int("updated", len(updates)), - ) + return 0 } + return len(updates) } func (e *EthIndexer) filterTracked(ctx context.Context, candidates map[common.Address]uint64) ([]common.Address, error) { @@ -446,16 +498,23 @@ func (e *EthIndexer) upsertBalanceUpdates(ctx context.Context, updates []balance weis = append(weis, u.bal.String()) blocks = append(blocks, int64(u.block)) } + // blocknumber semantics: + // - new block > 0 (Transfer-event path): take GREATEST with existing + // - new block = 0 (stale-refresh sweep): preserve existing column, + // don't downgrade a real block to 0 just because we re-read latest _, err := e.pool.Exec(ctx, ` INSERT INTO eth_wallet_balances (wallet, balance, blocknumber, updated_at) SELECT unnest(@wallets::text[]), unnest(@balances::text[])::numeric, - unnest(@blocks::bigint[]), + NULLIF(unnest(@blocks::bigint[]), 0), NOW() ON CONFLICT (wallet) DO UPDATE SET balance = EXCLUDED.balance, - blocknumber = GREATEST(eth_wallet_balances.blocknumber, EXCLUDED.blocknumber), + blocknumber = CASE + WHEN EXCLUDED.blocknumber IS NULL THEN eth_wallet_balances.blocknumber + ELSE GREATEST(COALESCE(eth_wallet_balances.blocknumber, 0), EXCLUDED.blocknumber) + END, updated_at = NOW() `, pgx.NamedArgs{ "wallets": wallets, @@ -465,6 +524,80 @@ func (e *EthIndexer) upsertBalanceUpdates(ctx context.Context, updates []balance return err } +// ScheduleStaleRefresh runs a background sweep that re-reads the oldest +// rows in eth_wallet_balances by updated_at and upserts the fresh values. +// Complements the live WS subscription: it recovers from drift, fills in +// rows that were never touched by a Transfer event (multi-wallet backfill +// placeholders), and re-reads anything that went stale while the WS was +// disconnected. Bounded throughput by design (batchSize per tick). +func (e *EthIndexer) ScheduleStaleRefresh(ctx context.Context) { + defer func() { + if r := recover(); r != nil { + e.logger.Error("stale-refresh goroutine panicked, will not restart", + zap.Any("panic", r), + ) + } + }() + + e.logger.Info("starting stale-refresh sweep", + zap.Duration("interval", e.staleRefreshInterval), + zap.Int("batch_size", e.staleRefreshBatchSize), + ) + ticker := time.NewTicker(e.staleRefreshInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + e.runStaleRefresh(ctx) + } + } +} + +func (e *EthIndexer) runStaleRefresh(ctx context.Context) { + addrs, err := e.selectStaleWallets(ctx) + if err != nil { + e.logger.Warn("stale refresh: select failed", zap.Error(err)) + return + } + if len(addrs) == 0 { + return + } + updated := e.refreshAddresses(ctx, addrs, nil) + if updated > 0 { + e.logger.Info("stale refresh: tick complete", + zap.Int("requested", len(addrs)), + zap.Int("updated", updated), + ) + } +} + +// selectStaleWallets returns the K rows from eth_wallet_balances with the +// oldest updated_at. Indexed by eth_wallet_balances_updated_at_idx. +func (e *EthIndexer) selectStaleWallets(ctx context.Context) ([]common.Address, error) { + rows, err := e.pool.Query(ctx, ` + SELECT wallet + FROM eth_wallet_balances + ORDER BY updated_at ASC + LIMIT $1 + `, e.staleRefreshBatchSize) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make([]common.Address, 0, e.staleRefreshBatchSize) + for rows.Next() { + var w string + if err := rows.Scan(&w); err != nil { + return nil, err + } + out = append(out, common.HexToAddress(w)) + } + return out, rows.Err() +} + func (e *EthIndexer) loadCheckpoint(ctx context.Context) (uint64, error) { var last int64 err := e.pool.QueryRow(ctx, From c3e4a2ff538823b5ac5e05735ca90e27b9d21cda Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 22 May 2026 17:13:50 -0700 Subject: [PATCH 2/3] feat(eth-indexer): batch contract reads via Multicall3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bundle balanceOf + totalStakedFor + getTotalDelegatorStake for every holder in a refresh batch into a single Multicall3 `aggregate3` call instead of issuing them as separate `eth_call`s. Before, with the default stale-refresh tick (50 holders × 3 selectors): 150 `eth_call` round-trips per tick → ~3,900 Alchemy CUs per tick After: 1 `eth_call` per tick → ~26 Alchemy CUs per tick A ~150× reduction in CUs and round-trips at the default config, and removes any cost concern with running the sweep at a tighter cadence. Multicall3 is deployed at the same address on every EVM chain (`0xcA11bde05977b3631167028862bE2a173976CA11`); held as a package constant since it's universal and we'd never need to change it. Implementation: - New file eth/indexer/multicall.go with the Multicall3 ABI encoding/decoding via go-ethereum's accounts/abi package, plus a totalAudioBalances(holders) entry point. Chunked at 200 holders per outer eth_call (= 600 sub-calls) to keep individual requests modest. - refreshAddresses simplified to one Multicall3 round-trip plus the same upsert it always did — drops the per-holder errgroup, worker pool, jobs/results channels, and the balanceFetchWorkers constant. - Same conservative posture on partial failures: holders whose three sub-calls didn't all succeed are skipped (omitted from the result map), so we never persist a partial sum. AllowFailure: true on each Call3 so one bad sub-call doesn't fail the whole multicall. Smoke-tested locally: ran with ethStaleRefreshBatchSize=10 against the live Alchemy endpoint, three pre-seeded rows (rayjacobson primary, 0xb46a… DEX router, Audius staking contract self-balance). All three refreshed in one tick via one multicall: stale refresh: tick complete requested:3 updated:3 Cross-checked the staking contract row — 247,024,527,620,589,302,425, 363,078 wei matched an independent eth_call against the AUDIO contract's balanceOf for that address. Pipeline correct end-to-end. --- eth/indexer/eth_indexer.go | 130 ++++++--------------------- eth/indexer/multicall.go | 180 +++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 105 deletions(-) create mode 100644 eth/indexer/multicall.go diff --git a/eth/indexer/eth_indexer.go b/eth/indexer/eth_indexer.go index 555baac9..4397bb3e 100644 --- a/eth/indexer/eth_indexer.go +++ b/eth/indexer/eth_indexer.go @@ -20,7 +20,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) // Contract function selectors. Computed at startup from their signatures so @@ -42,10 +41,6 @@ const checkpointName = "audio_transfers" // Backfill chunk size — Alchemy's free tier caps eth_getLogs at 10K blocks. const backfillChunkBlocks = 9000 -// Refresh fan-out: how many balanceOf calls we'll issue in parallel after a -// burst of events. Keeps a burst from saturating the upstream. -const balanceFetchWorkers = 8 - // Reconnect backoff bounds for the WS subscription. const ( reconnectInitialBackoff = 1 * time.Second @@ -325,60 +320,40 @@ func (e *EthIndexer) processLogs(ctx context.Context, logs []types.Log) { } } -// refreshAddresses fans out totalAudioBalance for each address (up to -// balanceFetchWorkers in flight at once) and upserts the results in one -// batch. blockByAddr is optional per-address block context: for live -// Transfer events it's the block the event was mined in; for stale-refresh -// sweeps it's nil so the existing blocknumber column is preserved. Returns -// the number of addresses that were actually upserted (omitting failures). +// refreshAddresses reads balanceOf + totalStakedFor + +// getTotalDelegatorStake for each address via a single Multicall3 +// `aggregate3` (chunked at multicallChunkSize holders) and upserts the +// summed results. blockByAddr is optional per-address block context: for +// live Transfer events it's the block the event was mined in; for +// stale-refresh sweeps it's nil so the existing blocknumber column is +// preserved. Returns the number of addresses upserted. func (e *EthIndexer) refreshAddresses(ctx context.Context, addrs []common.Address, blockByAddr map[common.Address]uint64) int { if len(addrs) == 0 { return 0 } - jobs := make(chan common.Address, len(addrs)) - results := make(chan balanceUpdate, len(addrs)) - workers := balanceFetchWorkers - if workers > len(addrs) { - workers = len(addrs) - } - for w := 0; w < workers; w++ { - go func() { - for addr := range jobs { - bal, err := e.totalAudioBalance(ctx, addr) - if err != nil { - e.logger.Warn("totalAudioBalance failed", - zap.String("addr", addr.Hex()), - zap.Error(err), - ) - results <- balanceUpdate{} // sentinel so receiver count matches - continue - } - block := uint64(0) - if blockByAddr != nil { - block = blockByAddr[addr] - } - results <- balanceUpdate{addr: addr, bal: bal, block: block} - } - }() - } - for _, addr := range addrs { - jobs <- addr + balances, err := e.totalAudioBalances(ctx, addrs) + if err != nil { + e.logger.Warn("totalAudioBalances failed", + zap.Int("holders", len(addrs)), + zap.Error(err), + ) + return 0 } - close(jobs) - updates := make([]balanceUpdate, 0, len(addrs)) - for i := 0; i < len(addrs); i++ { - select { - case <-ctx.Done(): - return len(updates) - case r := <-results: - if r.bal == nil { - continue - } - updates = append(updates, r) + updates := make([]balanceUpdate, 0, len(balances)) + for _, addr := range addrs { + bal, ok := balances[addr] + if !ok { + continue + } + block := uint64(0) + if blockByAddr != nil { + block = blockByAddr[addr] } + updates = append(updates, balanceUpdate{addr: addr, bal: bal, block: block}) } + if err := e.upsertBalanceUpdates(ctx, updates); err != nil { e.logger.Error("failed to upsert balances", zap.Error(err)) return 0 @@ -424,61 +399,6 @@ func (e *EthIndexer) filterTracked(ctx context.Context, candidates map[common.Ad return tracked, rows.Err() } -// totalAudioBalance returns balanceOf + totalStakedFor + getTotalDelegatorStake, -// matching the Python discovery-provider's `associated_wallets_balance` -// computation. All three calls run in parallel; any failure fails the whole -// read (we'd rather skip the wallet this round than persist a partial total). -func (e *EthIndexer) totalAudioBalance(ctx context.Context, holder common.Address) (*big.Int, error) { - var balance, staked, delegated *big.Int - - g, gctx := errgroup.WithContext(ctx) - g.Go(func() error { - v, err := e.uintCall(gctx, e.audioContract, balanceOfSelector, holder) - if err != nil { - return fmt.Errorf("balanceOf: %w", err) - } - balance = v - return nil - }) - g.Go(func() error { - v, err := e.uintCall(gctx, e.stakingContract, totalStakedForSelector, holder) - if err != nil { - return fmt.Errorf("totalStakedFor: %w", err) - } - staked = v - return nil - }) - g.Go(func() error { - v, err := e.uintCall(gctx, e.delegateManager, getTotalDelegatorStakeSelector, holder) - if err != nil { - return fmt.Errorf("getTotalDelegatorStake: %w", err) - } - delegated = v - return nil - }) - if err := g.Wait(); err != nil { - return nil, err - } - - sum := new(big.Int).Add(balance, staked) - sum.Add(sum, delegated) - return sum, nil -} - -// uintCall invokes a `func(address) returns (uint256)` style getter and -// decodes the result as a big.Int. -func (e *EthIndexer) uintCall(ctx context.Context, contract common.Address, selector []byte, holder common.Address) (*big.Int, error) { - data := append(append([]byte{}, selector...), common.LeftPadBytes(holder.Bytes(), 32)...) - msg := ethereum.CallMsg{To: &contract, Data: data} - out, err := e.httpClient.CallContract(ctx, msg, nil) - if err != nil { - return nil, err - } - if len(out) == 0 { - return big.NewInt(0), nil - } - return new(big.Int).SetBytes(out), nil -} type balanceUpdate struct { addr common.Address diff --git a/eth/indexer/multicall.go b/eth/indexer/multicall.go new file mode 100644 index 00000000..7b04d93d --- /dev/null +++ b/eth/indexer/multicall.go @@ -0,0 +1,180 @@ +package indexer + +// Batched contract reads via Multicall3 +// (https://github.com/mds1/multicall, deployed at the same address on +// every EVM chain). +// +// Replaces N separate `eth_call` round-trips with one. The default +// stale-refresh tick reads balanceOf + totalStakedFor + +// getTotalDelegatorStake for each of 50 holders — 150 `eth_call`s +// otherwise, 1 with multicall. + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" +) + +// Multicall3 is deployed at the same address on every EVM chain via +// nick-method CREATE2. Universal across mainnet, testnets, L2s. +var multicall3Address = common.HexToAddress("0xcA11bde05977b3631167028862bE2a173976CA11") + +// multicallChunkSize bounds how many holders we send in a single +// multicall. 200 holders × 3 sub-calls = 600 sub-calls per outer +// `eth_call`. Well under typical RPC payload limits; keeps individual +// requests small enough to time out cleanly on network blips. +const multicallChunkSize = 200 + +// Call3 mirrors Multicall3's input struct. +type call3 struct { + Target common.Address + AllowFailure bool + CallData []byte +} + +// Result3 mirrors Multicall3's output struct. +type result3 struct { + Success bool + ReturnData []byte +} + +var ( + aggregate3Selector = keccakSelector("aggregate3((address,bool,bytes)[])") + multicall3Args abi.Arguments + multicall3Result abi.Arguments +) + +func init() { + call3Type, err := abi.NewType("tuple[]", "", []abi.ArgumentMarshaling{ + {Name: "target", Type: "address"}, + {Name: "allowFailure", Type: "bool"}, + {Name: "callData", Type: "bytes"}, + }) + if err != nil { + panic(fmt.Errorf("multicall: defining Call3[] type: %w", err)) + } + result3Type, err := abi.NewType("tuple[]", "", []abi.ArgumentMarshaling{ + {Name: "success", Type: "bool"}, + {Name: "returnData", Type: "bytes"}, + }) + if err != nil { + panic(fmt.Errorf("multicall: defining Result3[] type: %w", err)) + } + multicall3Args = abi.Arguments{{Type: call3Type}} + multicall3Result = abi.Arguments{{Type: result3Type}} +} + +// totalAudioBalances batches balanceOf + totalStakedFor + +// getTotalDelegatorStake for each holder into a single Multicall3 +// `aggregate3` call (chunked at multicallChunkSize holders per multicall). +// Returns the sum of the three values per holder, matching the Python +// discovery-provider's `associated_wallets_balance` semantics. +// +// Holders whose 3 sub-calls didn't all succeed are omitted from the +// returned map — same conservative posture as the previous per-wallet +// errgroup path, which short-circuited on any sub-call error rather than +// persist a partial sum. +func (e *EthIndexer) totalAudioBalances(ctx context.Context, holders []common.Address) (map[common.Address]*big.Int, error) { + if len(holders) == 0 { + return map[common.Address]*big.Int{}, nil + } + out := make(map[common.Address]*big.Int, len(holders)) + for i := 0; i < len(holders); i += multicallChunkSize { + end := i + multicallChunkSize + if end > len(holders) { + end = len(holders) + } + chunk := holders[i:end] + if err := e.multicallChunk(ctx, chunk, out); err != nil { + return nil, err + } + } + return out, nil +} + +func (e *EthIndexer) multicallChunk(ctx context.Context, holders []common.Address, sink map[common.Address]*big.Int) error { + calls := make([]call3, 0, len(holders)*3) + for _, h := range holders { + padded := common.LeftPadBytes(h.Bytes(), 32) + calls = append(calls, + call3{ + Target: e.audioContract, + AllowFailure: true, + CallData: append(append([]byte{}, balanceOfSelector...), padded...), + }, + call3{ + Target: e.stakingContract, + AllowFailure: true, + CallData: append(append([]byte{}, totalStakedForSelector...), padded...), + }, + call3{ + Target: e.delegateManager, + AllowFailure: true, + CallData: append(append([]byte{}, getTotalDelegatorStakeSelector...), padded...), + }, + ) + } + + encoded, err := multicall3Args.Pack(calls) + if err != nil { + return fmt.Errorf("packing aggregate3 calls: %w", err) + } + data := append(append([]byte{}, aggregate3Selector...), encoded...) + + rawOut, err := e.httpClient.CallContract(ctx, ethereum.CallMsg{ + To: &multicall3Address, + Data: data, + }, nil) + if err != nil { + return fmt.Errorf("multicall eth_call: %w", err) + } + + decoded, err := multicall3Result.Unpack(rawOut) + if err != nil { + return fmt.Errorf("unpacking aggregate3 result: %w", err) + } + if len(decoded) != 1 { + return fmt.Errorf("expected 1 top-level value in result, got %d", len(decoded)) + } + + // go-ethereum's abi package returns the tuple[] as a slice of + // anonymous structs; coerce into our named result3 via reflection. + results := *abi.ConvertType(decoded[0], new([]result3)).(*[]result3) + + if len(results) != len(calls) { + return fmt.Errorf("multicall result count mismatch: %d vs %d", len(results), len(calls)) + } + + for i, h := range holders { + b, ok := decodeUint(results[i*3+0]) + if !ok { + continue + } + s, ok := decodeUint(results[i*3+1]) + if !ok { + continue + } + d, ok := decodeUint(results[i*3+2]) + if !ok { + continue + } + sum := new(big.Int).Add(b, s) + sum.Add(sum, d) + sink[h] = sum + } + return nil +} + +// decodeUint extracts a uint256 from a single Multicall3 Result. Returns +// (0, false) on failure or empty data so the caller can skip the holder +// (we'd rather not persist a partial sum). +func decodeUint(r result3) (*big.Int, bool) { + if !r.Success || len(r.ReturnData) == 0 { + return nil, false + } + return new(big.Int).SetBytes(r.ReturnData), true +} From 4b8b1943e0cf2bbd5bcb01357beb30d52762f30e Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 22 May 2026 17:22:39 -0700 Subject: [PATCH 3/3] test(eth-indexer): cover the multicall ABI + block-preserve SQL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the review on this PR — fills the "no test coverage" gap on the trickiest bits of the new code. multicall_test.go (pure, no infra): - TestDecodeUint — pins the four sub-call decode states (32-byte uint256, zero, failure, empty data). - TestMulticallEncodingRoundtrip — packs Call3[], unpacks it back, then does the same for Result3[] including the abi.ConvertType coercion into our named structs. Catches drift between our `call3` / `result3` field names and the ABI tuple component names, which is exactly where the live multicall would silently break. - TestAggregate3Selector — pins the 0x82ad56cb selector. If the keccak helper or signature string ever drift, this fails loudly instead of sending unroutable calls to Multicall3. eth_indexer_test.go (needs the docker-compose db on :21300): - TestUpsertBalanceUpdates_BlockSemantics walks the four orderings that the CASE/NULLIF/GREATEST clause has to get right: 1. event with block N → stored 2. stale-refresh with block 0 → balance updates, block preserved 3. event with lower block → block does NOT regress 4. event with higher block → block advances - TestUpsertBalanceUpdates_InsertWithNullBlock — cold-start case where a wallet is first observed via the stale-refresh path (e.g. multi-wallet backfill placeholders): block=0 must insert as NULL, not 0. migration0203SQL is inlined into the test file because sql/01_schema.sql hasn't been regenerated to include eth_wallet_balances yet (the test-schema regen path was the broken pg_migrate.sh chain). Keeps the test self-contained against the default test_jobs template. All 5 tests pass locally; no production code changed in this commit. --- eth/indexer/eth_indexer_test.go | 133 +++++++++++++++++++++++++++++++ eth/indexer/multicall_test.go | 135 ++++++++++++++++++++++++++++++++ 2 files changed, 268 insertions(+) create mode 100644 eth/indexer/eth_indexer_test.go create mode 100644 eth/indexer/multicall_test.go diff --git a/eth/indexer/eth_indexer_test.go b/eth/indexer/eth_indexer_test.go new file mode 100644 index 00000000..1ff0b264 --- /dev/null +++ b/eth/indexer/eth_indexer_test.go @@ -0,0 +1,133 @@ +package indexer + +import ( + "context" + "math/big" + "testing" + + "api.audius.co/database" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// migration0203SQL inlines the eth_wallet_balances table definition from +// ddl/migrations/0203_eth_wallet_balances.sql. Inlined rather than read +// from disk so the test stays self-contained — sql/01_schema.sql doesn't +// include this table yet (the schema dump regenerator was the broken +// `make test-schema` path), and we want the test runnable against the +// default test_jobs template. +const migration0203SQL = ` +CREATE TABLE IF NOT EXISTS eth_wallet_balances ( + wallet TEXT PRIMARY KEY, + balance NUMERIC NOT NULL DEFAULT 0, + blocknumber BIGINT, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +` + +// TestUpsertBalanceUpdates_BlockSemantics pins the three orderings that +// the block-preserve SQL has to handle correctly. The semantics matter +// because the stale-refresh sweep upserts with block=0 (translated to +// NULL by NULLIF), and we don't want a stale-refresh to overwrite a real +// blocknumber persisted by an earlier Transfer event upsert. +func TestUpsertBalanceUpdates_BlockSemantics(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + _, err := pool.Exec(ctx, migration0203SQL) + require.NoError(t, err) + + e := &EthIndexer{pool: pool} + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + walletKey := lowerHex(addr) + + read := func(t *testing.T) (string, *int64) { + t.Helper() + var balance string + var block *int64 + err := pool.QueryRow(ctx, + `SELECT balance::text, blocknumber FROM eth_wallet_balances WHERE wallet = $1`, + walletKey, + ).Scan(&balance, &block) + require.NoError(t, err) + return balance, block + } + + // (1) Initial event-path insert at block 12345 → both fields recorded. + err = e.upsertBalanceUpdates(ctx, []balanceUpdate{ + {addr: addr, bal: big.NewInt(100), block: 12345}, + }) + require.NoError(t, err) + balance, block := read(t) + assert.Equal(t, "100", balance) + require.NotNil(t, block) + assert.Equal(t, int64(12345), *block) + + // (2) Stale-refresh upsert with block=0 → balance updates, block is + // preserved (NOT overwritten with 0/NULL). + err = e.upsertBalanceUpdates(ctx, []balanceUpdate{ + {addr: addr, bal: big.NewInt(200), block: 0}, + }) + require.NoError(t, err) + balance, block = read(t) + assert.Equal(t, "200", balance, "stale refresh should update balance") + require.NotNil(t, block, "stale refresh must NOT clear the existing blocknumber") + assert.Equal(t, int64(12345), *block, + "stale refresh must NOT overwrite a real blocknumber with 0/NULL") + + // (3) Event upsert with a LOWER block → balance updates, block does + // NOT regress (GREATEST keeps the higher existing value). + err = e.upsertBalanceUpdates(ctx, []balanceUpdate{ + {addr: addr, bal: big.NewInt(300), block: 100}, + }) + require.NoError(t, err) + balance, block = read(t) + assert.Equal(t, "300", balance) + require.NotNil(t, block) + assert.Equal(t, int64(12345), *block, + "event with a lower block must not regress blocknumber from a higher previous value") + + // (4) Event upsert with a HIGHER block → block advances. + err = e.upsertBalanceUpdates(ctx, []balanceUpdate{ + {addr: addr, bal: big.NewInt(400), block: 99999}, + }) + require.NoError(t, err) + balance, block = read(t) + assert.Equal(t, "400", balance) + require.NotNil(t, block) + assert.Equal(t, int64(99999), *block) +} + +// TestUpsertBalanceUpdates_InsertWithNullBlock covers the cold-start case +// where a wallet is first observed via the stale-refresh path (e.g. one +// of the multi-wallet placeholder rows from the backfill SQL) rather +// than via a live event. block=0 must insert as NULL, not 0. +func TestUpsertBalanceUpdates_InsertWithNullBlock(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + + ctx := context.Background() + _, err := pool.Exec(ctx, migration0203SQL) + require.NoError(t, err) + + e := &EthIndexer{pool: pool} + addr := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd") + + err = e.upsertBalanceUpdates(ctx, []balanceUpdate{ + {addr: addr, bal: big.NewInt(500), block: 0}, + }) + require.NoError(t, err) + + var balance string + var block *int64 + err = pool.QueryRow(ctx, + `SELECT balance::text, blocknumber FROM eth_wallet_balances WHERE wallet = $1`, + lowerHex(addr), + ).Scan(&balance, &block) + require.NoError(t, err) + assert.Equal(t, "500", balance) + assert.Nil(t, block, "block=0 must insert as NULL, not 0") +} diff --git a/eth/indexer/multicall_test.go b/eth/indexer/multicall_test.go new file mode 100644 index 00000000..01ed84c4 --- /dev/null +++ b/eth/indexer/multicall_test.go @@ -0,0 +1,135 @@ +package indexer + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func uint256Bytes(n int64) []byte { + return common.LeftPadBytes(big.NewInt(n).Bytes(), 32) +} + +func TestDecodeUint(t *testing.T) { + tests := []struct { + name string + r result3 + wantOK bool + wantVal int64 + }{ + { + name: "success with 32-byte uint256", + r: result3{Success: true, ReturnData: uint256Bytes(123)}, + wantOK: true, + wantVal: 123, + }, + { + name: "success with zero", + r: result3{Success: true, ReturnData: uint256Bytes(0)}, + wantOK: true, + wantVal: 0, + }, + { + name: "failed call short-circuits", + r: result3{Success: false, ReturnData: uint256Bytes(999)}, + wantOK: false, + }, + { + name: "success but empty data", + r: result3{Success: true, ReturnData: nil}, + wantOK: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := decodeUint(tt.r) + assert.Equal(t, tt.wantOK, ok) + if tt.wantOK { + require.NotNil(t, got) + assert.Equal(t, big.NewInt(tt.wantVal).String(), got.String()) + } + }) + } +} + +// TestMulticallEncodingRoundtrip exercises the ABI pack/unpack path that +// the live multicall depends on, without touching the network. Catches +// regressions in: +// - Call3[] / Result3[] tuple type definitions (field names must match +// our call3 / result3 structs for the reflection-based encode/decode +// to work) +// - The abi.ConvertType coercion from anonymous-struct slice back into +// our named type — go-ethereum's behavior here is the subtle part +func TestMulticallEncodingRoundtrip(t *testing.T) { + calls := []call3{ + { + Target: common.HexToAddress("0xdead000000000000000000000000000000000001"), + AllowFailure: true, + CallData: []byte{0x70, 0xa0, 0x82, 0x31, 0xaa, 0xbb}, + }, + { + Target: common.HexToAddress("0xbeef000000000000000000000000000000000002"), + AllowFailure: false, + CallData: []byte{0x4b, 0x34, 0x1a, 0xed}, + }, + } + + // Pack Call3[] + encoded, err := multicall3Args.Pack(calls) + require.NoError(t, err, "packing Call3[] failed") + require.NotEmpty(t, encoded) + + // And confirm Call3[] roundtrips back cleanly (this catches any drift + // between our `call3` struct field names and the ABI tuple component + // names). + decodedCalls, err := multicall3Args.Unpack(encoded) + require.NoError(t, err, "unpacking Call3[] failed") + require.Len(t, decodedCalls, 1) + roundtripped := *abi.ConvertType(decodedCalls[0], new([]call3)).(*[]call3) + require.Len(t, roundtripped, len(calls)) + for i, c := range calls { + assert.Equal(t, c.Target, roundtripped[i].Target, "call[%d].Target", i) + assert.Equal(t, c.AllowFailure, roundtripped[i].AllowFailure, "call[%d].AllowFailure", i) + assert.Equal(t, c.CallData, roundtripped[i].CallData, "call[%d].CallData", i) + } + + // Now exercise the Result3[] path the way live multicall responses + // flow through it: pack a synthetic results blob (one success with a + // uint256 return, one failure with empty return data), then unpack + + // ConvertType, then verify decodeUint reads each correctly. + results := []result3{ + {Success: true, ReturnData: uint256Bytes(42)}, + {Success: false, ReturnData: nil}, + } + resultBytes, err := multicall3Result.Pack(results) + require.NoError(t, err) + + decodedResults, err := multicall3Result.Unpack(resultBytes) + require.NoError(t, err) + require.Len(t, decodedResults, 1) + coerced := *abi.ConvertType(decodedResults[0], new([]result3)).(*[]result3) + require.Len(t, coerced, 2) + + v0, ok := decodeUint(coerced[0]) + require.True(t, ok) + assert.Equal(t, "42", v0.String()) + + _, ok = decodeUint(coerced[1]) + assert.False(t, ok, "failure result should not decode") +} + +// TestAggregate3Selector pins the function selector we send on every +// multicall against the canonical signature. If go-ethereum's keccak ever +// drifts or somebody edits the selector helper, this fails loudly instead +// of silently sending calls to a no-op selector on the Multicall3 +// contract (which would just revert). +func TestAggregate3Selector(t *testing.T) { + // aggregate3((address,bool,bytes)[]) -> keccak256 first 4 bytes + expected := []byte{0x82, 0xad, 0x56, 0xcb} + assert.Equal(t, expected, aggregate3Selector, + "aggregate3 selector drifted — Multicall3 will not route this call") +}