Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 166 additions & 113 deletions eth/indexer/eth_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/big"
"os"
"strconv"
"sync/atomic"
"time"

Expand All @@ -18,7 +20,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// Contract function selectors. Computed at startup from their signatures so
Expand All @@ -40,16 +41,26 @@ const checkpointName = "audio_transfers"
// Backfill chunk size — Alchemy's free tier caps eth_getLogs at 10K blocks.
const backfillChunkBlocks = 9000

// Refresh fan-out: how many balanceOf calls we'll issue in parallel after a
// burst of events. Keeps a burst from saturating the upstream.
const balanceFetchWorkers = 8

// Reconnect backoff bounds for the WS subscription.
const (
reconnectInitialBackoff = 1 * time.Second
reconnectMaxBackoff = 60 * time.Second
)

// Stale-refresh defaults. The sweep complements the live WS subscription:
// it picks the K oldest rows in eth_wallet_balances by updated_at, re-reads
// their balances, and upserts. This recovers from drift, missed events
// during disconnects, and multi-wallet user backfills where we couldn't
// decompose user_balances.associated_wallets_balance per-wallet.
//
// Default cadence: 50 wallets / 30s ≈ 1.7 wallets/sec ≈ 144K/day. With
// ~3.15M tracked wallets a full sweep takes ~22 days. Tune via the env
// vars below.
const (
ethStaleRefreshDefaultInterval = 30 * time.Second
ethStaleRefreshDefaultBatchSize = 50
)

type EthIndexer struct {
config config.Config
pool database.DbPool
Expand All @@ -63,12 +74,26 @@ type EthIndexer struct {

httpClient *ethclient.Client

staleRefreshInterval time.Duration
staleRefreshBatchSize int

// State surfaced via /eth/health
connected atomic.Bool
lastBlockSeen atomic.Uint64
lastEventAt atomic.Pointer[time.Time]
}

// envIntDefault reads an env var as int, falling back to def on missing,
// empty, or unparseable values.
func envIntDefault(name string, def int) int {
if s := os.Getenv(name); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 {
return n
}
}
return def
}

func New(cfg config.Config) *EthIndexer {
logger := logging.NewZapLogger(cfg).Named("EthIndexer")

Expand All @@ -82,15 +107,17 @@ func New(cfg config.Config) *EthIndexer {
}

return &EthIndexer{
config: cfg,
pool: pool,
logger: logger,
httpURL: cfg.EthRpcUrl,
wsURL: cfg.EthWsUrl,
audioContract: common.HexToAddress(cfg.EthAudioContractAddress),
stakingContract: common.HexToAddress(cfg.EthStakingContractAddress),
delegateManager: common.HexToAddress(cfg.EthDelegateManagerContractAddress),
transferTopic: crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")),
config: cfg,
pool: pool,
logger: logger,
httpURL: cfg.EthRpcUrl,
wsURL: cfg.EthWsUrl,
audioContract: common.HexToAddress(cfg.EthAudioContractAddress),
stakingContract: common.HexToAddress(cfg.EthStakingContractAddress),
delegateManager: common.HexToAddress(cfg.EthDelegateManagerContractAddress),
transferTopic: crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")),
staleRefreshInterval: time.Duration(envIntDefault("ethStaleRefreshIntervalSecs", int(ethStaleRefreshDefaultInterval.Seconds()))) * time.Second,
staleRefreshBatchSize: envIntDefault("ethStaleRefreshBatchSize", ethStaleRefreshDefaultBatchSize),
}
}

Expand All @@ -111,6 +138,8 @@ func (e *EthIndexer) Start(ctx context.Context) error {
e.httpClient = httpClient
defer httpClient.Close()

go e.ScheduleStaleRefresh(ctx)

e.runSubscriptionLoop(ctx)
return nil
}
Expand Down Expand Up @@ -285,53 +314,51 @@ func (e *EthIndexer) processLogs(ctx context.Context, logs []types.Log) {
return
}

// Fan out balanceOf calls.
jobs := make(chan common.Address, len(tracked))
results := make(chan balanceUpdate, len(tracked))
workers := balanceFetchWorkers
if workers > len(tracked) {
workers = len(tracked)
}
for w := 0; w < workers; w++ {
go func() {
for addr := range jobs {
bal, err := e.totalAudioBalance(ctx, addr)
if err != nil {
e.logger.Warn("totalAudioBalance failed",
zap.String("addr", addr.Hex()),
zap.Error(err),
)
results <- balanceUpdate{} // sentinel so receiver count matches
continue
}
results <- balanceUpdate{addr: addr, bal: bal, block: candidates[addr]}
}
}()
}
for _, addr := range tracked {
jobs <- addr
}
close(jobs)

updates := make([]balanceUpdate, 0, len(tracked))
for i := 0; i < len(tracked); i++ {
select {
case <-ctx.Done():
return
case r := <-results:
if r.bal == nil {
continue
}
updates = append(updates, r)
updated := e.refreshAddresses(ctx, tracked, candidates)
if updated > 0 {
e.logger.Info("refreshed balances from events", zap.Int("updated", updated))
}
}

// refreshAddresses reads balanceOf + totalStakedFor +
// getTotalDelegatorStake for each address via a single Multicall3
// `aggregate3` (chunked at multicallChunkSize holders) and upserts the
// summed results. blockByAddr is optional per-address block context: for
// live Transfer events it's the block the event was mined in; for
// stale-refresh sweeps it's nil so the existing blocknumber column is
// preserved. Returns the number of addresses upserted.
func (e *EthIndexer) refreshAddresses(ctx context.Context, addrs []common.Address, blockByAddr map[common.Address]uint64) int {
if len(addrs) == 0 {
return 0
}

balances, err := e.totalAudioBalances(ctx, addrs)
if err != nil {
e.logger.Warn("totalAudioBalances failed",
zap.Int("holders", len(addrs)),
zap.Error(err),
)
return 0
}

updates := make([]balanceUpdate, 0, len(balances))
for _, addr := range addrs {
bal, ok := balances[addr]
if !ok {
continue
}
block := uint64(0)
if blockByAddr != nil {
block = blockByAddr[addr]
}
updates = append(updates, balanceUpdate{addr: addr, bal: bal, block: block})
}

if err := e.upsertBalanceUpdates(ctx, updates); err != nil {
e.logger.Error("failed to upsert balances", zap.Error(err))
} else if len(updates) > 0 {
e.logger.Info("refreshed balances from events",
zap.Int("updated", len(updates)),
)
return 0
}
return len(updates)
}

func (e *EthIndexer) filterTracked(ctx context.Context, candidates map[common.Address]uint64) ([]common.Address, error) {
Expand Down Expand Up @@ -372,61 +399,6 @@ func (e *EthIndexer) filterTracked(ctx context.Context, candidates map[common.Ad
return tracked, rows.Err()
}

// totalAudioBalance returns balanceOf + totalStakedFor + getTotalDelegatorStake,
// matching the Python discovery-provider's `associated_wallets_balance`
// computation. All three calls run in parallel; any failure fails the whole
// read (we'd rather skip the wallet this round than persist a partial total).
func (e *EthIndexer) totalAudioBalance(ctx context.Context, holder common.Address) (*big.Int, error) {
var balance, staked, delegated *big.Int

g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
v, err := e.uintCall(gctx, e.audioContract, balanceOfSelector, holder)
if err != nil {
return fmt.Errorf("balanceOf: %w", err)
}
balance = v
return nil
})
g.Go(func() error {
v, err := e.uintCall(gctx, e.stakingContract, totalStakedForSelector, holder)
if err != nil {
return fmt.Errorf("totalStakedFor: %w", err)
}
staked = v
return nil
})
g.Go(func() error {
v, err := e.uintCall(gctx, e.delegateManager, getTotalDelegatorStakeSelector, holder)
if err != nil {
return fmt.Errorf("getTotalDelegatorStake: %w", err)
}
delegated = v
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}

sum := new(big.Int).Add(balance, staked)
sum.Add(sum, delegated)
return sum, nil
}

// uintCall invokes a `func(address) returns (uint256)` style getter and
// decodes the result as a big.Int.
func (e *EthIndexer) uintCall(ctx context.Context, contract common.Address, selector []byte, holder common.Address) (*big.Int, error) {
data := append(append([]byte{}, selector...), common.LeftPadBytes(holder.Bytes(), 32)...)
msg := ethereum.CallMsg{To: &contract, Data: data}
out, err := e.httpClient.CallContract(ctx, msg, nil)
if err != nil {
return nil, err
}
if len(out) == 0 {
return big.NewInt(0), nil
}
return new(big.Int).SetBytes(out), nil
}

type balanceUpdate struct {
addr common.Address
Expand All @@ -446,16 +418,23 @@ func (e *EthIndexer) upsertBalanceUpdates(ctx context.Context, updates []balance
weis = append(weis, u.bal.String())
blocks = append(blocks, int64(u.block))
}
// blocknumber semantics:
// - new block > 0 (Transfer-event path): take GREATEST with existing
// - new block = 0 (stale-refresh sweep): preserve existing column,
// don't downgrade a real block to 0 just because we re-read latest
_, err := e.pool.Exec(ctx, `
INSERT INTO eth_wallet_balances (wallet, balance, blocknumber, updated_at)
SELECT
unnest(@wallets::text[]),
unnest(@balances::text[])::numeric,
unnest(@blocks::bigint[]),
NULLIF(unnest(@blocks::bigint[]), 0),
NOW()
ON CONFLICT (wallet) DO UPDATE SET
balance = EXCLUDED.balance,
blocknumber = GREATEST(eth_wallet_balances.blocknumber, EXCLUDED.blocknumber),
blocknumber = CASE
WHEN EXCLUDED.blocknumber IS NULL THEN eth_wallet_balances.blocknumber
ELSE GREATEST(COALESCE(eth_wallet_balances.blocknumber, 0), EXCLUDED.blocknumber)
END,
updated_at = NOW()
`, pgx.NamedArgs{
"wallets": wallets,
Expand All @@ -465,6 +444,80 @@ func (e *EthIndexer) upsertBalanceUpdates(ctx context.Context, updates []balance
return err
}

// ScheduleStaleRefresh runs a background sweep that re-reads the oldest
// rows in eth_wallet_balances by updated_at and upserts the fresh values.
// Complements the live WS subscription: it recovers from drift, fills in
// rows that were never touched by a Transfer event (multi-wallet backfill
// placeholders), and re-reads anything that went stale while the WS was
// disconnected. Bounded throughput by design (batchSize per tick).
func (e *EthIndexer) ScheduleStaleRefresh(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
e.logger.Error("stale-refresh goroutine panicked, will not restart",
zap.Any("panic", r),
)
}
}()

e.logger.Info("starting stale-refresh sweep",
zap.Duration("interval", e.staleRefreshInterval),
zap.Int("batch_size", e.staleRefreshBatchSize),
)
ticker := time.NewTicker(e.staleRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.runStaleRefresh(ctx)
}
}
}

func (e *EthIndexer) runStaleRefresh(ctx context.Context) {
addrs, err := e.selectStaleWallets(ctx)
if err != nil {
e.logger.Warn("stale refresh: select failed", zap.Error(err))
return
}
if len(addrs) == 0 {
return
}
updated := e.refreshAddresses(ctx, addrs, nil)
if updated > 0 {
e.logger.Info("stale refresh: tick complete",
zap.Int("requested", len(addrs)),
zap.Int("updated", updated),
)
}
}

// selectStaleWallets returns the K rows from eth_wallet_balances with the
// oldest updated_at. Indexed by eth_wallet_balances_updated_at_idx.
func (e *EthIndexer) selectStaleWallets(ctx context.Context) ([]common.Address, error) {
rows, err := e.pool.Query(ctx, `
SELECT wallet
FROM eth_wallet_balances
ORDER BY updated_at ASC
LIMIT $1
`, e.staleRefreshBatchSize)
if err != nil {
return nil, err
}
defer rows.Close()

out := make([]common.Address, 0, e.staleRefreshBatchSize)
for rows.Next() {
var w string
if err := rows.Scan(&w); err != nil {
return nil, err
}
out = append(out, common.HexToAddress(w))
}
return out, rows.Err()
}

func (e *EthIndexer) loadCheckpoint(ctx context.Context) (uint64, error) {
var last int64
err := e.pool.QueryRow(ctx,
Expand Down
Loading
Loading