feat(eth-indexer): periodic stale-refresh sweep#854
Merged
Conversation
The live WS subscription only learns about wallets that move AUDIO. To recover from drift, missed events during disconnects, and multi-wallet backfill placeholders (where user_balances.associated_wallets_balance couldn't be decomposed per-wallet), add a background sweep that re-reads the K oldest rows in eth_wallet_balances by updated_at every N seconds, calls totalAudioBalance, and upserts. Reuses the existing fan-out: extracts processLogs's "fan-out totalAudioBalance + upsert" tail into refreshAddresses(ctx, addrs, blockByAddr) and calls it from both the event path and the new sweep. blocknumber handling: - Event path (block > 0): GREATEST(existing, new). Already worked. - Stale-refresh path (no block): preserve existing. Pass NULL via NULLIF(0) so we don't write 0 over a real block. Smoke-tested: initial insert with block 12345, then NULL update keeps 12345, then a lower block (100) also keeps 12345. Config (defaults give a ~22-day full sweep over 3.15M wallets): ethStaleRefreshIntervalSecs default 30 ethStaleRefreshBatchSize default 50 Sustained at defaults: ~1.7 wallets/sec, ~5 RPC/sec total (each wallet runs balanceOf + totalStakedFor + getTotalDelegatorStake in parallel via the existing errgroup path). Well under any Alchemy tier ceiling. Bounded by design — the ticker drops a tick if the previous run is still in flight, so a slow upstream can't pile up work. Panic-safe via deferred recover so an unexpected error in the sweep won't crash the pod and take the WS subscription down with it. Smoke-tested locally: pre-seeded two rows with updated_at='1970-01-01', ran with ethStaleRefreshIntervalSecs=5 against the live Alchemy endpoint. Both rows refreshed on the first tick (balance read and upserted, updated_at advanced), and the sweep continued ticking every 5s without errors.
Bundle balanceOf + totalStakedFor + getTotalDelegatorStake for every holder in a refresh batch into a single Multicall3 `aggregate3` call instead of issuing them as separate `eth_call`s. Before, with the default stale-refresh tick (50 holders × 3 selectors): 150 `eth_call` round-trips per tick → ~3,900 Alchemy CUs per tick After: 1 `eth_call` per tick → ~26 Alchemy CUs per tick A ~150× reduction in CUs and round-trips at the default config, and removes any cost concern with running the sweep at a tighter cadence. Multicall3 is deployed at the same address on every EVM chain (`0xcA11bde05977b3631167028862bE2a173976CA11`); held as a package constant since it's universal and we'd never need to change it. Implementation: - New file eth/indexer/multicall.go with the Multicall3 ABI encoding/decoding via go-ethereum's accounts/abi package, plus a totalAudioBalances(holders) entry point. Chunked at 200 holders per outer eth_call (= 600 sub-calls) to keep individual requests modest. - refreshAddresses simplified to one Multicall3 round-trip plus the same upsert it always did — drops the per-holder errgroup, worker pool, jobs/results channels, and the balanceFetchWorkers constant. - Same conservative posture on partial failures: holders whose three sub-calls didn't all succeed are skipped (omitted from the result map), so we never persist a partial sum. AllowFailure: true on each Call3 so one bad sub-call doesn't fail the whole multicall. Smoke-tested locally: ran with ethStaleRefreshBatchSize=10 against the live Alchemy endpoint, three pre-seeded rows (rayjacobson primary, 0xb46a… DEX router, Audius staking contract self-balance). All three refreshed in one tick via one multicall: stale refresh: tick complete requested:3 updated:3 Cross-checked the staking contract row — 247,024,527,620,589,302,425, 363,078 wei matched an independent eth_call against the AUDIO contract's balanceOf for that address. Pipeline correct end-to-end.
Follow-up to the review on this PR — fills the "no test coverage" gap on
the trickiest bits of the new code.
multicall_test.go (pure, no infra):
- TestDecodeUint — pins the four sub-call decode states (32-byte
uint256, zero, failure, empty data).
- TestMulticallEncodingRoundtrip — packs Call3[], unpacks it back,
then does the same for Result3[] including the abi.ConvertType
coercion into our named structs. Catches drift between our
`call3` / `result3` field names and the ABI tuple component names,
which is exactly where the live multicall would silently break.
- TestAggregate3Selector — pins the 0x82ad56cb selector. If the keccak
helper or signature string ever drift, this fails loudly instead of
sending unroutable calls to Multicall3.
eth_indexer_test.go (needs the docker-compose db on :21300):
- TestUpsertBalanceUpdates_BlockSemantics walks the four orderings
that the CASE/NULLIF/GREATEST clause has to get right:
1. event with block N → stored
2. stale-refresh with block 0 → balance updates, block preserved
3. event with lower block → block does NOT regress
4. event with higher block → block advances
- TestUpsertBalanceUpdates_InsertWithNullBlock — cold-start case
where a wallet is first observed via the stale-refresh path
(e.g. multi-wallet backfill placeholders): block=0 must insert as
NULL, not 0.
migration0203SQL is inlined into the test file because sql/01_schema.sql
hasn't been regenerated to include eth_wallet_balances yet (the
test-schema regen path was the broken pg_migrate.sh chain). Keeps the
test self-contained against the default test_jobs template.
All 5 tests pass locally; no production code changed in this commit.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a background sweep inside the existing eth-indexer pod that re-reads the K oldest rows in
eth_wallet_balancesbyupdated_atevery N seconds and upserts the fresh values. Complements the live WS subscription, which only learns about wallets that move AUDIO — the sweep handles drift correction, missed events during WS disconnects, and any wallet that's been silent on-chain (e.g. multi-wallet backfill placeholders).How it works
ScheduleStaleRefreshlaunched fromStartalongsiderunSubscriptionLoop.SELECT wallet FROM eth_wallet_balances ORDER BY updated_at ASC LIMIT batchSize(useseth_wallet_balances_updated_at_idxfrom migration 0203), then fan-outtotalAudioBalancevia the existing 8-worker pool, then oneINSERT … ON CONFLICT … DO UPDATE.processLogs's tail intorefreshAddresses(ctx, addrs, blockByAddr).blockByAddris the event-block map for live events,nilfor stale-refresh.blocknumber semantics fix
The original
ON CONFLICT … GREATEST(eth_wallet_balances.blocknumber, EXCLUDED.blocknumber)would write0over an existing block when the stale-refresh path didn't have one. Fixed:Smoke-checked against the local DB: real block (12345) preserved across both a NULL-block stale refresh and a lower-block event upsert.
Config
ethStaleRefreshIntervalSecsethStaleRefreshBatchSizeAt defaults: ~1.7 wallets/sec, ~5 RPC calls/sec (each wallet =
balanceOf+totalStakedFor+getTotalDelegatorStakein parallel). For ~3.15M tracked wallets, a full sweep takes ~22 days. Tune via env vars if you want faster freshness.The ticker drops a tick if the previous run is still in flight, so a slow upstream can't pile up work. A deferred
recover()guards against an unexpected panic taking down the WS subscription with it.Smoke test (local)
eth_wallet_balanceswithupdated_at='1970-01-01'to force them to the top of the sweepethStaleRefreshIntervalSecs=5 ethStaleRefreshBatchSize=10 go run main.go eth-indexeragainst the live Alchemy URLstale refresh: tick complete requested:2 updated:2updated_atadvanced; balance values matched independenteth_callchecks (one was on-chain 0, one had moved AUDIO out since an earlier test — indexer correctly reflected both)Test plan
go build ./...cleango vet ./eth/...cleankubectl -n api logs deploy/eth-indexer | grep "stale refresh: tick complete"shows ticks at the configured intervalSELECT MIN(updated_at) FROM eth_wallet_balancesis no more than a few days old (proves the sweep is making progress)Coordinating with the one-shot backfill SQL
Before this lands you'll want to run the backfill SQL from my prior message so
eth_wallet_balanceshas rows. Otherwise the sweep just runs on an empty table and finds nothing to refresh until the live WS path discovers wallets organically. Specifically the third query (multi-wallet placeholders withupdated_at='1970-01-01') is what the sweep will chew through first.🤖 Generated with Claude Code