Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions eth/indexer/eth_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,30 +501,21 @@ type ethHealth struct {
LastBlockSeen uint64 `json:"last_block_seen"`
CheckpointBlock uint64 `json:"checkpoint_block"`
LastEventAt *time.Time `json:"last_event_at"`
TrackedWallets int64 `json:"tracked_wallets"`
CachedWallets int64 `json:"cached_wallets"`
}

// GetHealth returns indexer liveness in O(1) — all values are either in
// memory or come from a single-row PK lookup. Wallet-population counts
// previously lived on this response but were expensive on prod
// (UNION/COUNT across users + associated_wallets ≈ 3M rows, no index, can
// take 30s+) and don't actually answer "is the indexer alive?", which is
// what a health endpoint is for. If you want population stats, query
// eth_wallet_balances directly.
func (e *EthIndexer) GetHealth(ctx context.Context, maxEventLagSecs int64) (*ethHealth, error) {
checkpoint, err := e.loadCheckpoint(ctx)
if err != nil {
return nil, fmt.Errorf("loading checkpoint: %w", err)
}

var tracked, cached int64
err = e.pool.QueryRow(ctx, `
SELECT
(SELECT COUNT(*) FROM (
SELECT LOWER(wallet) FROM users WHERE wallet IS NOT NULL AND wallet <> ''
UNION
SELECT LOWER(wallet) FROM associated_wallets WHERE chain = 'eth' AND is_delete = FALSE
) t) AS tracked,
(SELECT COUNT(*) FROM eth_wallet_balances) AS cached
`).Scan(&tracked, &cached)
if err != nil {
return nil, fmt.Errorf("counting wallets: %w", err)
}

errs := make([]string, 0)
if !e.connected.Load() && e.wsURL != "" {
errs = append(errs, "websocket subscription not connected")
Expand All @@ -545,8 +536,6 @@ func (e *EthIndexer) GetHealth(ctx context.Context, maxEventLagSecs int64) (*eth
LastBlockSeen: e.lastBlockSeen.Load(),
CheckpointBlock: checkpoint,
LastEventAt: e.lastEventAt.Load(),
TrackedWallets: tracked,
CachedWallets: cached,
}, nil
}

Expand Down
11 changes: 10 additions & 1 deletion eth/indexer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ import (
"encoding/json"
"net"
"net/http"
"time"

"github.com/gofiber/fiber/v2"
"github.com/mcuadros/go-defaults"
"go.uber.org/zap"
)

// healthHandlerTimeout caps how long /eth/health is willing to wait on
// downstream work (DB, etc.) before returning an error. The endpoint is
// supposed to be O(1); a 2s ceiling is generous and stops a future slow
// query from hanging the handler indefinitely.
const healthHandlerTimeout = 2 * time.Second

type Server struct {
*fiber.App
logger *zap.Logger
Expand Down Expand Up @@ -39,7 +46,9 @@ func NewServer(indexer *EthIndexer) *Server {
}
defaults.SetDefaults(&q)

health, err := indexer.GetHealth(c.Context(), q.MaxEventLagSecs)
ctx, cancel := context.WithTimeout(c.Context(), healthHandlerTimeout)
defer cancel()
health, err := indexer.GetHealth(ctx, q.MaxEventLagSecs)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
Expand Down
Loading