diff --git a/src/index.ts b/src/index.ts index 597708e..cb89c46 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,9 +21,11 @@ import { insuranceRoutes } from "./routes/insurance.js"; import { openInterestRoutes } from "./routes/open-interest.js"; import { statsRoutes } from "./routes/stats.js"; import { chartRoutes } from "./routes/chart.js"; +import { candleRoutes } from "./routes/candles.js"; import { docsRoutes } from "./routes/docs.js"; import { adlRoutes } from "./routes/adl.js"; import { setupWebSocket, cleanupPriceUpdateTimers } from "./routes/ws.js"; +import { OraclePriceBroadcaster } from "./services/OraclePriceBroadcaster.js"; import { readRateLimit, writeRateLimit } from "./middleware/rate-limit.js"; import { ipBlocklist } from "./middleware/ip-blocklist.js"; import { cacheMiddleware } from "./middleware/cache.js"; @@ -200,6 +202,7 @@ app.route("/", insuranceRoutes()); app.route("/", openInterestRoutes()); app.route("/", statsRoutes()); app.route("/", chartRoutes()); +app.route("/", candleRoutes()); app.route("/", adlRoutes()); app.route("/", docsRoutes()); @@ -323,6 +326,16 @@ const server = serve({ fetch: app.fetch, port }, (info) => { const wss = setupWebSocket(server as unknown as import("node:http").Server); +// Bridge oracle_prices INSERTs → local eventBus → WS clients. Without this +// the cross-process price.updated events from the indexer never reach WS +// subscribers, and the frontend only sees new prices on page refresh. +const oraclePriceBroadcaster = new OraclePriceBroadcaster(); +oraclePriceBroadcaster.start().catch((err) => { + logger.error("OraclePriceBroadcaster start failed", { + error: err instanceof Error ? err.message : String(err), + }); +}); + const SHUTDOWN_TIMEOUT_MS = 10_000; async function shutdown(signal: string): Promise { diff --git a/src/routes/candles.ts b/src/routes/candles.ts new file mode 100644 index 0000000..4ca10c6 --- /dev/null +++ b/src/routes/candles.ts @@ -0,0 +1,134 @@ +/** + * Candles API Route — internal Percolator OHLCV from the `trades` table. + * + * GET /candles/:slab?resolution=1&from=&to= + * + * Response shape follows TradingView UDF (same as Pyth Benchmarks proxy so the + * frontend can swap data source without changing parsing): + * + * { s: "ok"|"no_data", t: number[], o: number[], h: number[], l: number[], c: number[], v: number[] } + * + * Timestamps are Unix seconds (UDF convention). Resolution maps to minutes, + * except "1D" which buckets into days. + * + * Implementation: fetches raw trades in the requested range, buckets them + * in-process. Works on plain Postgres; when Variant B of migration + * 20260420_candle_support.sql is applied (TimescaleDB continuous aggregates), + * this can be upgraded to query the candles_* materialized views directly. + */ +import { Hono } from "hono"; +import { getSupabase, getNetwork, createLogger, truncateErrorMessage } from "@percolator/shared"; +import { validateSlab } from "../middleware/validateSlab.js"; + +const logger = createLogger("api:candles"); + +const RES_TO_SECONDS: Record = { + "1": 60, + "5": 5 * 60, + "15": 15 * 60, + "60": 60 * 60, + "240": 4 * 60 * 60, + "1D": 24 * 60 * 60, +}; + +const MAX_BARS = 5000; + +interface UdfResponse { + s: "ok" | "no_data" | "error"; + t: number[]; + o: number[]; + h: number[]; + l: number[]; + c: number[]; + v: number[]; +} + +function emptyResponse(status: "no_data" | "error"): UdfResponse { + return { s: status, t: [], o: [], h: [], l: [], c: [], v: [] }; +} + +interface TradeRow { + price: number; + size: number | string; + created_at: string; +} + +/** Bucket raw trades into OHLCV candles. Input must be in ascending `created_at` order. */ +export function bucketCandles(trades: TradeRow[], bucketSeconds: number): UdfResponse { + if (trades.length === 0) return emptyResponse("no_data"); + const bars = new Map(); + + for (const t of trades) { + const ts = Math.floor(new Date(t.created_at).getTime() / 1000); + const bucket = Math.floor(ts / bucketSeconds) * bucketSeconds; + const price = Number(t.price); + const size = Math.abs(Number(t.size)); + if (!Number.isFinite(price) || !Number.isFinite(size)) continue; + + const existing = bars.get(bucket); + if (!existing) { + bars.set(bucket, { o: price, h: price, l: price, c: price, v: size }); + } else { + if (price > existing.h) existing.h = price; + if (price < existing.l) existing.l = price; + existing.c = price; + existing.v += size; + } + } + + const sortedKeys = [...bars.keys()].sort((a, b) => a - b); + const out: UdfResponse = { s: "ok", t: [], o: [], h: [], l: [], c: [], v: [] }; + for (const k of sortedKeys) { + const b = bars.get(k)!; + out.t.push(k); + out.o.push(b.o); + out.h.push(b.h); + out.l.push(b.l); + out.c.push(b.c); + out.v.push(b.v); + } + return out; +} + +export function candleRoutes(): Hono { + const app = new Hono(); + + app.get("/candles/:slab", validateSlab, async (c) => { + const slab = c.req.param("slab"); + const resolution = c.req.query("resolution") ?? "1"; + const fromSec = parseInt(c.req.query("from") ?? "0", 10); + const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10); + + const bucketSeconds = RES_TO_SECONDS[resolution]; + if (!bucketSeconds) { + return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400); + } + if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) { + return c.json({ s: "error", errmsg: "Invalid from/to" }, 400); + } + + try { + const { data, error } = await getSupabase() + .from("trades") + .select("price, size, created_at") + .eq("slab_address", slab) + .eq("network", getNetwork()) + .gte("created_at", new Date(fromSec * 1000).toISOString()) + .lte("created_at", new Date(toSec * 1000).toISOString()) + .order("created_at", { ascending: true }) + .limit(MAX_BARS * 10); + if (error) throw error; + const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds); + return c.json(bars); + } catch (err) { + logger.error("Candles query failed", { + slab, + resolution, + error: truncateErrorMessage(err instanceof Error ? err.message : String(err), 120), + }); + return c.json(emptyResponse("error"), 500); + } + }); + + return app; +} diff --git a/src/services/OraclePriceBroadcaster.ts b/src/services/OraclePriceBroadcaster.ts new file mode 100644 index 0000000..62e552f --- /dev/null +++ b/src/services/OraclePriceBroadcaster.ts @@ -0,0 +1,106 @@ +/** + * OraclePriceBroadcaster + * + * Bridges cross-process state: the INDEXER (separate service) writes rows to + * Supabase `oracle_prices` on every keeper oracle push. This service subscribes + * to Supabase Realtime INSERT events on that table and fires a LOCAL + * `price.updated` event on the api's `eventBus`. The existing WebSocket handler + * in `routes/ws.ts` picks that up and fans out to clients subscribed to + * `price:`. + * + * Without this, the api's `eventBus.on("price.updated")` handler waits for an + * event that no in-process emitter fires — so frontends only see new prices on + * page refresh, never live. + * + * REQUIRES the `oracle_prices` table to be added to Supabase's `supabase_realtime` + * publication: + * + * ALTER PUBLICATION supabase_realtime ADD TABLE oracle_prices; + */ +import { eventBus, getSupabase, getNetwork, createLogger } from "@percolator/shared"; +import type { RealtimeChannel } from "@supabase/supabase-js"; + +const logger = createLogger("api:price-broadcaster"); + +interface OraclePriceRow { + slab_address: string; + price_e6: string | number; + timestamp: number; + tx_signature: string | null; + network: string; +} + +export class OraclePriceBroadcaster { + private channel: RealtimeChannel | null = null; + private started = false; + + async start(): Promise { + if (this.started) return; + this.started = true; + + const network = getNetwork(); + try { + const sb = getSupabase(); + this.channel = sb + .channel("oracle-prices-broadcaster") + .on( + "postgres_changes", + { + event: "INSERT", + schema: "public", + table: "oracle_prices", + filter: `network=eq.${network}`, + }, + (payload) => { + try { + const row = payload.new as OraclePriceRow | undefined; + if (!row || !row.slab_address) return; + const priceE6 = typeof row.price_e6 === "string" + ? Number(row.price_e6) + : Number(row.price_e6); + if (!Number.isFinite(priceE6) || priceE6 <= 0) return; + + eventBus.publish("price.updated", row.slab_address, { + priceE6, + // For a Hyperp, mark and index converge on the oracle value + // pushed here. The frontend uses whichever the ws.ts handler + // formats into the outbound JSON. + markPriceE6: priceE6, + indexPriceE6: priceE6, + source: "oracle_prices", + tx_signature: row.tx_signature ?? undefined, + }); + } catch (err) { + logger.error("oracle_prices insert handler failed", { + error: err instanceof Error ? err.message : String(err), + }); + } + }, + ) + .subscribe((status) => { + if (status === "SUBSCRIBED") { + logger.info("oracle-price broadcaster subscribed", { network }); + } else if (status === "CHANNEL_ERROR" || status === "TIMED_OUT") { + logger.error("oracle-price broadcaster channel status", { status, network }); + } + }); + } catch (err) { + logger.error("failed to start oracle-price broadcaster", { + error: err instanceof Error ? err.message : String(err), + }); + this.started = false; + } + } + + async stop(): Promise { + if (this.channel) { + try { + await getSupabase().removeChannel(this.channel); + } catch { + /* ignore */ + } + this.channel = null; + } + this.started = false; + } +} diff --git a/tests/routes/candles.test.ts b/tests/routes/candles.test.ts new file mode 100644 index 0000000..8fbdce9 --- /dev/null +++ b/tests/routes/candles.test.ts @@ -0,0 +1,134 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { bucketCandles, candleRoutes } from "../../src/routes/candles.js"; + +vi.mock("@percolator/shared", () => ({ + getSupabase: vi.fn(), + getNetwork: vi.fn(() => "devnet"), + createLogger: vi.fn(() => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + })), + truncateErrorMessage: vi.fn((msg: string) => msg), + sanitizeSlabAddress: vi.fn((addr: string) => addr), +})); + +const { getSupabase } = await import("@percolator/shared"); + +describe("bucketCandles", () => { + it("returns no_data on empty input", () => { + expect(bucketCandles([], 60)).toMatchObject({ s: "no_data", t: [] }); + }); + + it("buckets trades into 1-minute OHLCV", () => { + const trades = [ + { price: 100, size: 10, created_at: "2026-04-20T12:00:15Z" }, // bucket 12:00 + { price: 105, size: 5, created_at: "2026-04-20T12:00:45Z" }, // bucket 12:00 + { price: 102, size: 20, created_at: "2026-04-20T12:01:10Z" }, // bucket 12:01 + ]; + const bars = bucketCandles(trades, 60); + expect(bars.s).toBe("ok"); + expect(bars.t).toHaveLength(2); + // first bucket: o=100, h=105, l=100, c=105, v=15 + expect(bars.o[0]).toBe(100); + expect(bars.h[0]).toBe(105); + expect(bars.l[0]).toBe(100); + expect(bars.c[0]).toBe(105); + expect(bars.v[0]).toBe(15); + // second bucket: o=102, h=102, l=102, c=102, v=20 + expect(bars.o[1]).toBe(102); + expect(bars.v[1]).toBe(20); + }); + + it("keeps high/low across many trades in one bucket", () => { + const trades = [ + { price: 100, size: 1, created_at: "2026-04-20T12:00:00Z" }, + { price: 110, size: 1, created_at: "2026-04-20T12:00:10Z" }, + { price: 95, size: 1, created_at: "2026-04-20T12:00:20Z" }, + { price: 105, size: 1, created_at: "2026-04-20T12:00:30Z" }, + ]; + const bars = bucketCandles(trades, 60); + expect(bars.h[0]).toBe(110); + expect(bars.l[0]).toBe(95); + expect(bars.c[0]).toBe(105); + expect(bars.v[0]).toBe(4); + }); + + it("skips trades with non-finite price/size", () => { + const trades = [ + { price: NaN, size: 1, created_at: "2026-04-20T12:00:00Z" }, + { price: 100, size: Infinity, created_at: "2026-04-20T12:00:05Z" }, + { price: 101, size: 5, created_at: "2026-04-20T12:00:10Z" }, + ]; + const bars = bucketCandles(trades, 60); + expect(bars.t).toHaveLength(1); + expect(bars.o[0]).toBe(101); + }); + + it("sorts buckets ascending even when input is shuffled", () => { + const trades = [ + { price: 200, size: 1, created_at: "2026-04-20T12:02:00Z" }, + { price: 100, size: 1, created_at: "2026-04-20T12:00:00Z" }, + { price: 150, size: 1, created_at: "2026-04-20T12:01:00Z" }, + ]; + const bars = bucketCandles(trades, 60); + expect(bars.t[0] < bars.t[1] && bars.t[1] < bars.t[2]).toBe(true); + }); +}); + +describe("GET /candles/:slab", () => { + let mockSupabase: any; + const SLAB = "GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG"; + + beforeEach(() => { + vi.clearAllMocks(); + mockSupabase = { + from: vi.fn(() => mockSupabase), + select: vi.fn(() => mockSupabase), + eq: vi.fn(() => mockSupabase), + gte: vi.fn(() => mockSupabase), + lte: vi.fn(() => mockSupabase), + order: vi.fn(() => mockSupabase), + limit: vi.fn().mockResolvedValue({ data: [], error: null }), + }; + vi.mocked(getSupabase).mockReturnValue(mockSupabase); + }); + + it("returns no_data when trades table is empty", async () => { + const app = candleRoutes(); + const res = await app.request(`/candles/${SLAB}?resolution=1&from=0&to=9999999999`); + expect(res.status).toBe(200); + const body = await res.json() as any; + expect(body.s).toBe("no_data"); + }); + + it("returns ok with bars when trades exist", async () => { + mockSupabase.limit.mockResolvedValueOnce({ + data: [ + { price: 100, size: 10, created_at: "2026-04-20T12:00:00Z" }, + { price: 101, size: 5, created_at: "2026-04-20T12:00:30Z" }, + ], + error: null, + }); + const app = candleRoutes(); + const res = await app.request(`/candles/${SLAB}?resolution=1&from=0&to=9999999999`); + const body = await res.json() as any; + expect(body.s).toBe("ok"); + expect(body.t).toHaveLength(1); + expect(body.h[0]).toBe(101); + expect(body.v[0]).toBe(15); + }); + + it("rejects unsupported resolution", async () => { + const app = candleRoutes(); + const res = await app.request(`/candles/${SLAB}?resolution=99&from=0&to=9999999999`); + expect(res.status).toBe(400); + }); + + it("rejects invalid from/to", async () => { + const app = candleRoutes(); + const res = await app.request(`/candles/${SLAB}?resolution=1&from=1000&to=500`); + expect(res.status).toBe(400); + }); +});