From 2af2d16d68ac1db195d20ac9ab3b98c29a144213 Mon Sep 17 00:00:00 2001 From: chechangyuan Date: Tue, 12 May 2026 21:53:39 +0800 Subject: [PATCH] Fix /api/markets/movers timeout and make /api/health honest Production movers endpoint timed out 100% of the time for 8 days starting 5/4 16:00 UTC. Root cause: handler did 1,600 sequential kv.get() calls inside detectMovers() (one per tracked market), plus 1,600 KV writes via recordPriceSnapshots(), all in the request path. Once @vercel/kv's Upstash migration added a small amount of per-request latency, total runtime crossed the 25s function timeout and never recovered. Changes: - Extract snapshot/movers logic to api/lib/price-snapshots.ts. detectMoversBatch() uses kv.mget() in chunks of 100, replacing 1,600 individual gets with ~16 batched calls. recordPriceSnapshots() now also reads via mget and caps each market's snapshot array at 300 entries so values stop bloating until 7-day TTL. - Add api/cron/refresh-markets.ts. Runs every 2 minutes, fetches markets, records snapshots, precomputes movers for buckets 0.02/0.05/0.1/0.2, and writes them to movers:precomputed: with a 5-minute TTL. Writes freshness timestamps to meta:last_snapshot_run and meta:last_movers_run for /api/health to consume. - Rewrite api/markets/movers.ts as a read-only endpoint: single kv.get() of the precomputed bucket, in-memory filter for limit/category, plus a 20s in-process response cache. Returns 503 if the cron hasn't run. - Rewrite api/health.ts as a 4-check honest health probe: real market counts (Poly >= 800, Kalshi >= 200), KV read+write probe, snapshot freshness (< 5min), movers freshness (< 5min). Returns 503 on any degraded check so external monitors actually fire. - Update vercel.json with the new cron entry and rewrite. Co-Authored-By: Claude Opus 4.7 (1M context) --- api/cron/refresh-markets.ts | 120 +++++++++++++++ api/health.ts | 193 ++++++++++++++---------- api/lib/price-snapshots.ts | 165 +++++++++++++++++++++ api/markets/movers.ts | 282 ++++++++---------------------------- vercel.json | 8 + 5 files changed, 472 insertions(+), 296 deletions(-) create mode 100644 api/cron/refresh-markets.ts create mode 100644 api/lib/price-snapshots.ts diff --git a/api/cron/refresh-markets.ts b/api/cron/refresh-markets.ts new file mode 100644 index 0000000..8bab987 --- /dev/null +++ b/api/cron/refresh-markets.ts @@ -0,0 +1,120 @@ +import type { VercelRequest, VercelResponse } from '@vercel/node'; +import { getMarkets } from '../lib/market-cache'; +import { kv, setKvWithTtl } from '../lib/vercel-kv'; +import { + detectMoversBatch, + recordPriceSnapshots, + getMoversKey, + META_LAST_SNAPSHOT_RUN, + META_LAST_MOVERS_RUN, +} from '../lib/price-snapshots'; + +// Buckets the /api/markets/movers endpoint serves directly. Caller-provided +// minChange snaps to the nearest bucket ≤ the requested value, so adding a +// bucket here adds finer resolution without touching the read path. +const MOVERS_BUCKETS = [0.02, 0.05, 0.1, 0.2]; +const MOVERS_TTL_SECONDS = 5 * 60; + +export default async function handler( + req: VercelRequest, + res: VercelResponse, +): Promise { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization'); + + if (req.method === 'OPTIONS') { + res.status(200).end(); + return; + } + + if (req.method !== 'GET' && req.method !== 'POST') { + res.status(405).json({ success: false, error: 'Method not allowed.' }); + return; + } + + const cronSecret = req.headers.authorization?.replace('Bearer ', ''); + if (cronSecret !== process.env.CRON_SECRET) { + console.error('[Cron refresh-markets] Unauthorized: Invalid CRON_SECRET'); + res.status(401).json({ success: false, error: 'Unauthorized' }); + return; + } + + const startedAt = Date.now(); + const timings: Record = {}; + + try { + const t0 = Date.now(); + const markets = await getMarkets(); + timings.getMarkets_ms = Date.now() - t0; + + if (markets.length === 0) { + console.warn('[Cron refresh-markets] No markets returned, skipping'); + res.status(200).json({ success: true, skipped: 'no markets', timings }); + return; + } + + const t1 = Date.now(); + const writeStats = await recordPriceSnapshots(markets); + timings.recordSnapshots_ms = Date.now() - t1; + + await setKvWithTtl(META_LAST_SNAPSHOT_RUN, 24 * 60 * 60, { + timestamp: new Date().toISOString(), + markets_total: markets.length, + ...writeStats, + }); + + const t2 = Date.now(); + const moversByBucket: Record = {}; + + // Compute the smallest bucket once; the larger buckets are just filters of + // that result. One KV mget pass instead of N. + const minBucket = Math.min(...MOVERS_BUCKETS); + const baseMovers = await detectMoversBatch(markets, minBucket, 1); + + await Promise.all( + MOVERS_BUCKETS.map(async (bucket) => { + const filtered = baseMovers.filter( + (m) => Math.abs(m.priceChange1h) >= bucket, + ); + moversByBucket[bucket.toString()] = filtered.length; + await setKvWithTtl(getMoversKey(bucket.toString()), MOVERS_TTL_SECONDS, { + computedAt: new Date().toISOString(), + minChange: bucket, + markets_analyzed: markets.length, + movers: filtered, + }); + }), + ); + + timings.computeMovers_ms = Date.now() - t2; + + await setKvWithTtl(META_LAST_MOVERS_RUN, 24 * 60 * 60, { + timestamp: new Date().toISOString(), + buckets: moversByBucket, + markets_analyzed: markets.length, + }); + + timings.total_ms = Date.now() - startedAt; + + console.log( + `[Cron refresh-markets] done in ${timings.total_ms}ms — ${markets.length} markets, snapshots ${JSON.stringify(writeStats)}, movers ${JSON.stringify(moversByBucket)}`, + ); + + res.status(200).json({ + success: true, + markets_total: markets.length, + snapshots: writeStats, + movers: moversByBucket, + timings, + }); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + console.error('[Cron refresh-markets] Error:', errorMessage); + res.status(500).json({ + success: false, + error: errorMessage, + timings: { ...timings, total_ms: Date.now() - startedAt }, + }); + } +} diff --git a/api/health.ts b/api/health.ts index d581941..30ee0c9 100644 --- a/api/health.ts +++ b/api/health.ts @@ -1,104 +1,151 @@ import type { VercelRequest, VercelResponse } from '@vercel/node'; -import { fetchPolymarkets } from '../src/api/polymarket-client'; -import { fetchKalshiMarkets } from '../src/api/kalshi-client'; +import { getMarkets } from './lib/market-cache'; +import { kv } from './lib/vercel-kv'; +import { + getMoversKey, + META_LAST_SNAPSHOT_RUN, +} from './lib/price-snapshots'; + +type CheckStatus = 'healthy' | 'degraded' | 'down'; +interface CheckResult { + status: CheckStatus; + detail?: Record; + error?: string; +} + +const POLY_MIN_HEALTHY = parseInt(process.env.HEALTH_POLY_MIN || '800', 10); +const KALSHI_MIN_HEALTHY = parseInt(process.env.HEALTH_KALSHI_MIN || '200', 10); +const FRESHNESS_MAX_AGE_MS = 5 * 60 * 1000; +const KV_PROBE_KEY = 'health:probe'; +const KV_PROBE_TIMEOUT_MS = 1500; + +function withTimeout(p: Promise, ms: number, label: string): Promise { + let handle: ReturnType; + const timer = new Promise((_, reject) => { + handle = setTimeout(() => reject(new Error(`${label} timed out after ${ms}ms`)), ms); + }); + return Promise.race([p, timer]).finally(() => clearTimeout(handle)); +} + +async function checkMarketCounts(): Promise { + try { + const markets = await getMarkets(); + const poly = markets.filter((m) => m.platform === 'polymarket').length; + const kalshi = markets.filter((m) => m.platform === 'kalshi').length; + + const polyOk = poly >= POLY_MIN_HEALTHY; + const kalshiOk = kalshi >= KALSHI_MIN_HEALTHY; + + return { + status: polyOk && kalshiOk ? 'healthy' : 'degraded', + detail: { + polymarket: { markets: poly, threshold: POLY_MIN_HEALTHY, ok: polyOk }, + kalshi: { markets: kalshi, threshold: KALSHI_MIN_HEALTHY, ok: kalshiOk }, + total: markets.length, + }, + }; + } catch (err) { + return { status: 'down', error: err instanceof Error ? err.message : String(err) }; + } +} + +async function checkKvReachable(): Promise { + const t0 = Date.now(); + try { + await withTimeout(kv.set(KV_PROBE_KEY, t0, { ex: 60 }), KV_PROBE_TIMEOUT_MS, 'KV write'); + await withTimeout(kv.get(KV_PROBE_KEY), KV_PROBE_TIMEOUT_MS, 'KV read'); + const latency = Date.now() - t0; + return { + status: latency > 1000 ? 'degraded' : 'healthy', + detail: { latency_ms: latency }, + }; + } catch (err) { + return { status: 'down', error: err instanceof Error ? err.message : String(err) }; + } +} + +async function checkFreshness( + key: string, + label: string, + extract: (v: any) => string | undefined, +): Promise { + try { + const value = await kv.get(key); + if (!value) { + return { status: 'degraded', error: `${label}: no run recorded yet` }; + } + const ts = extract(value); + if (!ts) { + return { status: 'degraded', error: `${label}: missing timestamp` }; + } + const ageMs = Date.now() - new Date(ts).getTime(); + return { + status: ageMs > FRESHNESS_MAX_AGE_MS ? 'degraded' : 'healthy', + detail: { last_run: ts, age_seconds: Math.floor(ageMs / 1000) }, + }; + } catch (err) { + return { status: 'down', error: err instanceof Error ? err.message : String(err) }; + } +} + +function rollup(checks: Record): CheckStatus { + const statuses = Object.values(checks).map((c) => c.status); + if (statuses.every((s) => s === 'healthy')) return 'healthy'; + if (statuses.some((s) => s === 'down')) return 'down'; + return 'degraded'; +} export default async function handler( req: VercelRequest, - res: VercelResponse + res: VercelResponse, ): Promise { - // CORS headers res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); - // Handle preflight if (req.method === 'OPTIONS') { res.status(200).end(); return; } - // Only accept GET if (req.method !== 'GET') { res.setHeader('Allow', 'GET, OPTIONS'); - res.status(405).json({ - success: false, - error: 'Method not allowed. Use GET.', - }); + res.status(405).json({ success: false, error: 'Method not allowed. Use GET.' }); return; } const startTime = Date.now(); try { - // Test API connections - const [polyResult, kalshiResult] = await Promise.allSettled([ - fetchPolymarkets(10, 1), // Just fetch 10 markets as a health check - fetchKalshiMarkets(10, 1), + const [marketCounts, kvReach, snapshotFresh, moversFresh] = await Promise.all([ + checkMarketCounts(), + checkKvReachable(), + checkFreshness(META_LAST_SNAPSHOT_RUN, 'snapshots', (v) => v?.timestamp), + checkFreshness(getMoversKey('0.05'), 'movers', (v) => v?.computedAt), ]); - const polymarketStatus = polyResult.status === 'fulfilled' - ? { status: 'healthy', markets: polyResult.value.length } - : { status: 'degraded', error: String(polyResult.reason) }; - - const kalshiStatus = kalshiResult.status === 'fulfilled' - ? { status: 'healthy', markets: kalshiResult.value.length } - : { status: 'degraded', error: String(kalshiResult.reason) }; - - // Determine overall status - const overallStatus = - polymarketStatus.status === 'healthy' && kalshiStatus.status === 'healthy' - ? 'healthy' - : polymarketStatus.status === 'degraded' && kalshiStatus.status === 'degraded' - ? 'down' - : 'degraded'; - - const healthData = { - status: overallStatus, - timestamp: new Date().toISOString(), - uptime_ms: process.uptime() * 1000, - response_time_ms: Date.now() - startTime, - version: '2.0.0', - services: { - polymarket: polymarketStatus, - kalshi: kalshiStatus, - }, - endpoints: { - '/api/analyze-text': { - method: 'POST', - description: 'Analyze text and return matching markets with trading signals', - status: 'healthy', - }, - '/api/markets/arbitrage': { - method: 'GET', - description: 'Get cross-platform arbitrage opportunities', - status: 'healthy', - }, - '/api/markets/movers': { - method: 'GET', - description: 'Get markets with significant price changes', - status: 'healthy', - }, - '/api/health': { - method: 'GET', - description: 'API health check', - status: 'healthy', - }, - }, - limits: { - max_markets_per_request: 5, - cache_ttl_seconds: 300, - rate_limit: 'none (currently)', - }, + const checks = { + market_counts: marketCounts, + kv_reachable: kvReach, + snapshot_freshness: snapshotFresh, + movers_freshness: moversFresh, }; - const response = { - success: true, - data: healthData, - }; + const overall = rollup(checks); - const statusCode = overallStatus === 'healthy' ? 200 : overallStatus === 'degraded' ? 503 : 503; - res.status(statusCode).json(response); + const body = { + success: overall === 'healthy', + data: { + status: overall, + timestamp: new Date().toISOString(), + response_time_ms: Date.now() - startTime, + version: '2.1.0', + checks, + }, + }; + const statusCode = overall === 'healthy' ? 200 : 503; + res.status(statusCode).json(body); } catch (error) { console.error('[Health API] Error:', error); res.status(500).json({ diff --git a/api/lib/price-snapshots.ts b/api/lib/price-snapshots.ts new file mode 100644 index 0000000..fd67abf --- /dev/null +++ b/api/lib/price-snapshots.ts @@ -0,0 +1,165 @@ +import { Market } from '../../src/types/market'; +import { kv, setKvWithTtl } from './vercel-kv'; +import { batchGetFromKV } from './cache-helper'; + +export interface PriceSnapshot { + marketId: string; + yesPrice: number; + timestamp: number; +} + +export interface MarketMover { + market: Market; + priceChange1h: number; + previousPrice: number; + currentPrice: number; + direction: 'up' | 'down'; + timestamp: number; +} + +export const HISTORY_TTL_SECONDS = 7 * 24 * 60 * 60; +export const SNAPSHOT_KEY_PREFIX = 'price_history:'; +export const MOVERS_PRECOMPUTED_PREFIX = 'movers:precomputed:'; +export const META_LAST_SNAPSHOT_RUN = 'meta:last_snapshot_run'; +export const META_LAST_MOVERS_RUN = 'meta:last_movers_run'; + +// Snapshot array hard cap. At 5-min cadence, 300 entries covers 25h — enough for +// the 1h and 24h lookbacks the movers endpoint exposes. Without this cap, arrays +// grew until 7-day TTL expired, bloating KV values and slowing mget linearly. +const MAX_SNAPSHOTS_PER_MARKET = 300; +const SNAPSHOT_DEDUP_WINDOW_MS = 60_000; +const KV_BATCH_SIZE = 100; + +export function getSnapshotKey(marketId: string): string { + return `${SNAPSHOT_KEY_PREFIX}${marketId}`; +} + +export function getMoversKey(bucket: string): string { + return `${MOVERS_PRECOMPUTED_PREFIX}${bucket}`; +} + +function chunk(arr: T[], size: number): T[][] { + const out: T[][] = []; + for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size)); + return out; +} + +export async function recordPriceSnapshots(markets: Market[]): Promise<{ + written: number; + skipped: number; + errors: number; +}> { + const now = Date.now(); + const cutoff = now - HISTORY_TTL_SECONDS * 1000; + + const keys = markets.map((m) => getSnapshotKey(m.id)); + let written = 0; + let skipped = 0; + let errors = 0; + + for (const [chunkIdx, batchKeys] of chunk(keys, KV_BATCH_SIZE).entries()) { + const batchMarkets = markets.slice( + chunkIdx * KV_BATCH_SIZE, + chunkIdx * KV_BATCH_SIZE + batchKeys.length, + ); + + const existing = await batchGetFromKV(kv, batchKeys); + + await Promise.allSettled( + batchMarkets.map(async (market, i) => { + try { + const prior = existing[i] ?? []; + const latest = prior.length > 0 ? prior[prior.length - 1] : null; + + if (latest && now - latest.timestamp < SNAPSHOT_DEDUP_WINDOW_MS) { + skipped++; + return; + } + + const appended: PriceSnapshot[] = [ + ...prior.filter((s) => s.timestamp >= cutoff), + { marketId: market.id, yesPrice: market.yesPrice, timestamp: now }, + ]; + + const trimmed = appended.length > MAX_SNAPSHOTS_PER_MARKET + ? appended.slice(appended.length - MAX_SNAPSHOTS_PER_MARKET) + : appended; + + await setKvWithTtl(getSnapshotKey(market.id), HISTORY_TTL_SECONDS, trimmed); + written++; + } catch (err) { + errors++; + console.error(`[Snapshots] write failed for ${market.id}:`, err); + } + }), + ); + } + + return { written, skipped, errors }; +} + +export function computePriceChange( + snapshots: PriceSnapshot[] | null, + hoursAgo: number, +): { change: number; previousPrice: number } | null { + if (!snapshots || snapshots.length < 2) return null; + + const current = snapshots[snapshots.length - 1]; + const targetTime = Date.now() - hoursAgo * 60 * 60 * 1000; + + let closest = snapshots[0]; + let closestDiff = Math.abs(closest.timestamp - targetTime); + for (const s of snapshots) { + const d = Math.abs(s.timestamp - targetTime); + if (d < closestDiff) { + closest = s; + closestDiff = d; + } + } + + // Tolerance ±0.5×hoursAgo (e.g. ±30 min for a 1h lookback). Anything looser + // overstates change magnitude — see prior FIX 7 in the original code. + if (closestDiff > hoursAgo * 60 * 60 * 1000 * 0.5) return null; + + return { + change: current.yesPrice - closest.yesPrice, + previousPrice: closest.yesPrice, + }; +} + +export async function detectMoversBatch( + markets: Market[], + minChange: number, + hoursAgo = 1, +): Promise { + const movers: MarketMover[] = []; + const keys = markets.map((m) => getSnapshotKey(m.id)); + + for (const [chunkIdx, batchKeys] of chunk(keys, KV_BATCH_SIZE).entries()) { + const batchMarkets = markets.slice( + chunkIdx * KV_BATCH_SIZE, + chunkIdx * KV_BATCH_SIZE + batchKeys.length, + ); + + const snapshotArrays = await batchGetFromKV(kv, batchKeys); + + for (let i = 0; i < batchMarkets.length; i++) { + const market = batchMarkets[i]; + const priceData = computePriceChange(snapshotArrays[i], hoursAgo); + if (priceData === null) continue; + if (Math.abs(priceData.change) < minChange) continue; + + movers.push({ + market, + priceChange1h: priceData.change, + previousPrice: priceData.previousPrice, + currentPrice: market.yesPrice, + direction: priceData.change > 0 ? 'up' : 'down', + timestamp: Date.now(), + }); + } + } + + movers.sort((a, b) => Math.abs(b.priceChange1h) - Math.abs(a.priceChange1h)); + return movers; +} diff --git a/api/markets/movers.ts b/api/markets/movers.ts index fe6c85e..a4bca00 100644 --- a/api/markets/movers.ts +++ b/api/markets/movers.ts @@ -1,222 +1,57 @@ import type { VercelRequest, VercelResponse } from '@vercel/node'; -import { Market } from '../../src/types/market'; -import { getMarkets, getMarketMetadata } from '../lib/market-cache'; -import { kv, listKvKeys, setKvWithTtl } from '../lib/vercel-kv'; - -/** - * Vercel KV-based price tracking for persistent movers detection - * - * NOTE: @vercel/kv is deprecated. For new projects, use Upstash Redis - * integration from Vercel Marketplace. Existing KV stores have been - * migrated to Upstash Redis automatically. - * - * Migration path: https://vercel.com/marketplace?category=storage&search=redis - */ - -interface PriceSnapshot { - marketId: string; - yesPrice: number; - timestamp: number; -} - -interface MarketMover { - market: Market; - priceChange1h: number; - previousPrice: number; - currentPrice: number; - direction: 'up' | 'down'; - timestamp: number; -} - -const HISTORY_TTL_SECONDS = 7 * 24 * 60 * 60; // 7 days -const SNAPSHOT_KEY_PREFIX = 'price_history:'; - -/** - * Get KV key for market price history - */ -function getSnapshotKey(marketId: string): string { - return `${SNAPSHOT_KEY_PREFIX}${marketId}`; -} - -/** - * Record price snapshots for markets in Vercel KV - */ -async function recordPriceSnapshots(markets: Market[]): Promise { - const now = Date.now(); - const cutoff = now - (HISTORY_TTL_SECONDS * 1000); - - // Process markets in batches to avoid rate limits - const batchSize = 50; - for (let i = 0; i < markets.length; i += batchSize) { - const batch = markets.slice(i, i + batchSize); - - await Promise.allSettled( - batch.map(async (market) => { - const key = getSnapshotKey(market.id); - - // Get existing snapshots - const snapshots = await kv.get(key) || []; - - // Skip if already recorded recently (within 60 seconds) - // Prevents unbounded KV growth from high-frequency polling - const latestTimestamp = snapshots.length > 0 ? snapshots[snapshots.length - 1].timestamp : 0; - if (now - latestTimestamp < 60000) { - return; // Skip — already recorded in the last minute - } - - // Add new snapshot - const newSnapshot: PriceSnapshot = { - marketId: market.id, - yesPrice: market.yesPrice, - timestamp: now, - }; - - snapshots.push(newSnapshot); - - // Keep only recent snapshots (within TTL) - const filtered = snapshots.filter((s: PriceSnapshot) => s.timestamp >= cutoff); - - // Store back to KV with TTL - await setKvWithTtl(key, HISTORY_TTL_SECONDS, filtered); - }) - ); - } +import { getMarketMetadata } from '../lib/market-cache'; +import { kv } from '../lib/vercel-kv'; +import { + MarketMover, + getMoversKey, +} from '../lib/price-snapshots'; + +// Bucket precomputed every cron tick. Caller-supplied minChange snaps down to +// the nearest available bucket; results are filtered in-memory for any +// minChange above that bucket. Keep in sync with MOVERS_BUCKETS in +// api/cron/refresh-markets.ts. +const BUCKETS = [0.02, 0.05, 0.1, 0.2]; + +interface MoversCacheEntry { + computedAt: string; + minChange: number; + markets_analyzed: number; + movers: MarketMover[]; } -/** - * Get price change for a market from KV - * Returns both the price change and previous price to avoid duplicate KV reads - */ -async function getPriceChange(marketId: string, hoursAgo: number): Promise<{ change: number; previousPrice: number } | null> { - const key = getSnapshotKey(marketId); - const snapshots = await kv.get(key); - - if (!snapshots || snapshots.length < 2) { - return null; - } - - const current = snapshots[snapshots.length - 1]; - const targetTime = Date.now() - (hoursAgo * 60 * 60 * 1000); +const RESPONSE_CACHE_TTL_MS = 20_000; +const responseCache = new Map(); - // Find closest snapshot to target time - let closestSnapshot = snapshots[0]; - let closestDiff = Math.abs(closestSnapshot.timestamp - targetTime); - - for (const snapshot of snapshots) { - const diff = Math.abs(snapshot.timestamp - targetTime); - if (diff < closestDiff) { - closestDiff = diff; - closestSnapshot = snapshot; - } - } - - // FIX 7: original tolerance was 2× hoursAgo — for a 1-hour lookback this accepted - // snapshots up to 3 hours old as a valid "1 hour ago" reference, overstating price - // changes by 2-3×. Tightened to 0.5× so the reference snapshot must be within - // ±30 min of the target time (e.g. between 30 min and 90 min ago for hoursAgo=1). - if (closestDiff > (hoursAgo * 60 * 60 * 1000 * 0.5)) { - return null; - } - - return { - change: current.yesPrice - closestSnapshot.yesPrice, - previousPrice: closestSnapshot.yesPrice, - }; -} - -/** - * Detect market movers using KV-stored price history - */ -async function detectMovers(markets: Market[], minChange: number): Promise { - const movers: MarketMover[] = []; - - // Process markets in batches for performance - const batchSize = 50; - for (let i = 0; i < markets.length; i += batchSize) { - const batch = markets.slice(i, i + batchSize); - - const results = await Promise.allSettled( - batch.map(async (market) => { - const priceData = await getPriceChange(market.id, 1); - - if (priceData === null) return null; - - const absChange = Math.abs(priceData.change); - if (absChange >= minChange) { - return { - market, - priceChange1h: priceData.change, - previousPrice: priceData.previousPrice, - currentPrice: market.yesPrice, - direction: priceData.change > 0 ? 'up' : 'down' as 'up' | 'down', - timestamp: Date.now(), - }; - } - - return null; - }) - ); - - // Collect successful results - for (const result of results) { - if (result.status === 'fulfilled' && result.value !== null) { - movers.push(result.value); - } - } - } - - // Sort by absolute change - movers.sort((a, b) => Math.abs(b.priceChange1h) - Math.abs(a.priceChange1h)); - - return movers; -} - -/** - * Get approximate snapshot count (number of markets tracked) - * - * Note: Returns key count instead of total snapshots to avoid N+1 query. - * Each market has ~2000 snapshots (7 days × 288 snapshots/day). - */ -async function getTrackedMarketCount(): Promise { - try { - // Just count keys, don't fetch all snapshot arrays - const keys = await listKvKeys(`${SNAPSHOT_KEY_PREFIX}*`); - return keys.length; - } catch (error) { - console.error('[Movers API] Failed to get market count:', error); - return 0; - } +function pickBucket(minChange: number): number { + // Largest bucket ≤ minChange so we don't under-serve. If minChange is below + // the smallest bucket, use the smallest. + const eligible = BUCKETS.filter((b) => b <= minChange); + if (eligible.length === 0) return BUCKETS[0]; + return Math.max(...eligible); } export default async function handler( req: VercelRequest, - res: VercelResponse + res: VercelResponse, ): Promise { - // CORS headers res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); - // Handle preflight if (req.method === 'OPTIONS') { res.status(200).end(); return; } - // Only accept GET if (req.method !== 'GET') { res.setHeader('Allow', 'GET, OPTIONS'); - res.status(405).json({ - success: false, - error: 'Method not allowed. Use GET.', - }); + res.status(405).json({ success: false, error: 'Method not allowed. Use GET.' }); return; } const startTime = Date.now(); try { - // Parse query parameters const { minChange = '0.05', limit = '20', @@ -226,7 +61,6 @@ export default async function handler( const minChangeNum = parseFloat(minChange as string); const limitNum = parseInt(limit as string, 10); - // Validate parameters if (isNaN(minChangeNum) || minChangeNum < 0 || minChangeNum > 1) { res.status(400).json({ success: false, @@ -243,39 +77,42 @@ export default async function handler( return; } - // Get markets - const markets = await getMarkets(); + const categoryStr = typeof category === 'string' ? category : undefined; + const bucket = pickBucket(minChangeNum); + const cacheKey = `${bucket}|${minChangeNum}|${limitNum}|${categoryStr ?? ''}`; + const cached = responseCache.get(cacheKey); + if (cached && Date.now() - cached.at < RESPONSE_CACHE_TTL_MS) { + res.status(200).json(cached.body); + return; + } + + const entry = await kv.get(getMoversKey(bucket.toString())); - if (markets.length === 0) { + if (!entry) { + // No precomputed result yet — the cron either hasn't run since deploy + // or KV is unreachable. Return 503 so monitors fire instead of silently + // serving empty data. res.status(503).json({ success: false, - error: 'No markets available. Service temporarily unavailable.', + error: 'Movers data not yet computed. Cron job may not have completed first run.', + bucket, + retry_after_seconds: 120, }); return; } - // Record price snapshots to KV (must await to avoid race condition) - await recordPriceSnapshots(markets); - - // Detect movers (reads from KV, so must happen after snapshots are written) - let movers = await detectMovers(markets, minChangeNum); - - // Filter by category if specified - if (category) { - movers = movers.filter(m => m.market.category === category); + let movers = entry.movers; + if (minChangeNum > bucket) { + movers = movers.filter((m) => Math.abs(m.priceChange1h) >= minChangeNum); + } + if (categoryStr) { + movers = movers.filter((m) => m.market.category === categoryStr); } - - // Limit results movers = movers.slice(0, limitNum); - // Get tracked market count for metadata (lightweight, no N+1 query) - const trackedMarkets = await getTrackedMarketCount(); - - // Stage 0: Get freshness metadata const freshnessMetadata = getMarketMetadata(); - // Build response - const response = { + const body = { success: true, data: { movers, @@ -284,15 +121,15 @@ export default async function handler( filters: { minChange: minChangeNum, limit: limitNum, - category: category || null, + category: categoryStr ?? null, }, metadata: { processing_time_ms: Date.now() - startTime, - markets_analyzed: markets.length, - markets_tracked: trackedMarkets, - storage: 'Vercel KV (Redis)', + markets_analyzed: entry.markets_analyzed, + precomputed_at: entry.computedAt, + precomputed_bucket: bucket, + storage: 'Vercel KV (Redis) — precomputed via cron', history_retention: '7 days', - // Stage 0: Freshness metadata data_age_seconds: freshnessMetadata.data_age_seconds, fetched_at: freshnessMetadata.fetched_at, sources: freshnessMetadata.sources, @@ -300,11 +137,10 @@ export default async function handler( }, }; - res.status(200).json(response); + responseCache.set(cacheKey, { at: Date.now(), body }); + res.status(200).json(body); } catch (error) { console.error('[Movers API] Error:', error); - - // Check if it's a KV error const errorMessage = error instanceof Error ? error.message : 'Internal server error'; const isKVError = errorMessage.includes('KV') || errorMessage.includes('Redis'); diff --git a/vercel.json b/vercel.json index 5d0587d..e0e0116 100644 --- a/vercel.json +++ b/vercel.json @@ -32,12 +32,20 @@ { "source": "/api/feed/accounts", "destination": "/api/feed/accounts.ts" + }, + { + "source": "/api/cron/refresh-markets", + "destination": "/api/cron/refresh-markets.ts" } ], "crons": [ { "path": "/api/cron/collect-tweets", "schedule": "*/2 * * * *" + }, + { + "path": "/api/cron/refresh-markets", + "schedule": "*/2 * * * *" } ], "headers": [