Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import { insuranceRoutes } from "./routes/insurance.js";
import { openInterestRoutes } from "./routes/open-interest.js";
import { statsRoutes } from "./routes/stats.js";
import { chartRoutes } from "./routes/chart.js";
import { candleRoutes } from "./routes/candles.js";
import { docsRoutes } from "./routes/docs.js";
import { adlRoutes } from "./routes/adl.js";
import { setupWebSocket, cleanupPriceUpdateTimers } from "./routes/ws.js";
import { OraclePriceBroadcaster } from "./services/OraclePriceBroadcaster.js";
import { readRateLimit, writeRateLimit } from "./middleware/rate-limit.js";
import { ipBlocklist } from "./middleware/ip-blocklist.js";
import { cacheMiddleware } from "./middleware/cache.js";
Expand Down Expand Up @@ -200,6 +202,7 @@ app.route("/", insuranceRoutes());
app.route("/", openInterestRoutes());
app.route("/", statsRoutes());
app.route("/", chartRoutes());
app.route("/", candleRoutes());
app.route("/", adlRoutes());
app.route("/", docsRoutes());

Expand Down Expand Up @@ -323,6 +326,16 @@ const server = serve({ fetch: app.fetch, port }, (info) => {

const wss = setupWebSocket(server as unknown as import("node:http").Server);

// Bridge oracle_prices INSERTs → local eventBus → WS clients. Without this
// the cross-process price.updated events from the indexer never reach WS
// subscribers, and the frontend only sees new prices on page refresh.
const oraclePriceBroadcaster = new OraclePriceBroadcaster();
oraclePriceBroadcaster.start().catch((err) => {
logger.error("OraclePriceBroadcaster start failed", {
error: err instanceof Error ? err.message : String(err),
});
});

const SHUTDOWN_TIMEOUT_MS = 10_000;

async function shutdown(signal: string): Promise<void> {
Expand Down
134 changes: 134 additions & 0 deletions src/routes/candles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Candles API Route — internal Percolator OHLCV from the `trades` table.
*
* GET /candles/:slab?resolution=1&from=<unix_s>&to=<unix_s>
*
* Response shape follows TradingView UDF (same as Pyth Benchmarks proxy so the
* frontend can swap data source without changing parsing):
*
* { s: "ok"|"no_data", t: number[], o: number[], h: number[], l: number[], c: number[], v: number[] }
*
* Timestamps are Unix seconds (UDF convention). Resolution maps to minutes,
* except "1D" which buckets into days.
*
* Implementation: fetches raw trades in the requested range, buckets them
* in-process. Works on plain Postgres; when Variant B of migration
* 20260420_candle_support.sql is applied (TimescaleDB continuous aggregates),
* this can be upgraded to query the candles_* materialized views directly.
*/
import { Hono } from "hono";
import { getSupabase, getNetwork, createLogger, truncateErrorMessage } from "@percolator/shared";
import { validateSlab } from "../middleware/validateSlab.js";

const logger = createLogger("api:candles");

const RES_TO_SECONDS: Record<string, number> = {
"1": 60,
"5": 5 * 60,
"15": 15 * 60,
"60": 60 * 60,
"240": 4 * 60 * 60,
"1D": 24 * 60 * 60,
};

const MAX_BARS = 5000;

interface UdfResponse {
s: "ok" | "no_data" | "error";
t: number[];
o: number[];
h: number[];
l: number[];
c: number[];
v: number[];
}

function emptyResponse(status: "no_data" | "error"): UdfResponse {
return { s: status, t: [], o: [], h: [], l: [], c: [], v: [] };
}

interface TradeRow {
price: number;
size: number | string;
created_at: string;
}

/** Bucket raw trades into OHLCV candles. Input must be in ascending `created_at` order. */
export function bucketCandles(trades: TradeRow[], bucketSeconds: number): UdfResponse {
if (trades.length === 0) return emptyResponse("no_data");
const bars = new Map<number, { o: number; h: number; l: number; c: number; v: number }>();

for (const t of trades) {
const ts = Math.floor(new Date(t.created_at).getTime() / 1000);
const bucket = Math.floor(ts / bucketSeconds) * bucketSeconds;
const price = Number(t.price);
const size = Math.abs(Number(t.size));
if (!Number.isFinite(price) || !Number.isFinite(size)) continue;

const existing = bars.get(bucket);
if (!existing) {
bars.set(bucket, { o: price, h: price, l: price, c: price, v: size });
} else {
if (price > existing.h) existing.h = price;
if (price < existing.l) existing.l = price;
existing.c = price;
existing.v += size;
}
}

const sortedKeys = [...bars.keys()].sort((a, b) => a - b);
const out: UdfResponse = { s: "ok", t: [], o: [], h: [], l: [], c: [], v: [] };
Comment on lines +79 to +80
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Return no_data after filtering all invalid trades.

If every row is skipped for non-finite price/size, this currently returns { s: "ok", t: [] }, which is inconsistent UDF output.

🐛 Proposed fix
   const sortedKeys = [...bars.keys()].sort((a, b) => a - b);
+  if (sortedKeys.length === 0) return emptyResponse("no_data");
   const out: UdfResponse = { s: "ok", t: [], o: [], h: [], l: [], c: [], v: [] };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routes/candles.ts` around lines 79 - 80, After filtering out rows with
non-finite price/size you must return a UDF "no_data" response instead of an
empty "ok" payload; update the logic around the construction of out (the
UdfResponse with s, t, o, h, l, c, v) that follows sortedKeys =
[...bars.keys()].sort((a, b) => a - b) to detect when no valid rows were pushed
and set out.s = "no_data" (and ensure t and other arrays remain empty) before
returning; adjust the return path where out is returned so the client receives {
s: "no_data" } when every row was skipped.

for (const k of sortedKeys) {
const b = bars.get(k)!;
out.t.push(k);
out.o.push(b.o);
out.h.push(b.h);
out.l.push(b.l);
out.c.push(b.c);
out.v.push(b.v);
}
return out;
}

export function candleRoutes(): Hono {
const app = new Hono();

app.get("/candles/:slab", validateSlab, async (c) => {
const slab = c.req.param("slab");
const resolution = c.req.query("resolution") ?? "1";
const fromSec = parseInt(c.req.query("from") ?? "0", 10);
const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10);

const bucketSeconds = RES_TO_SECONDS[resolution];
if (!bucketSeconds) {
return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400);
}
if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) {
return c.json({ s: "error", errmsg: "Invalid from/to" }, 400);
}
Comment on lines +99 to +108
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Reject malformed timestamps before building Dates.

parseInt accepts partial input like 123abc, and very large finite values can pass this check then throw in toISOString(), turning a bad request into a 500. Parse strictly and validate the Date range up front.

🛡️ Proposed validation tightening
-    const fromSec = parseInt(c.req.query("from") ?? "0", 10);
-    const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10);
+    const fromRaw = c.req.query("from") ?? "0";
+    const toRaw = c.req.query("to") ?? String(Math.floor(Date.now() / 1000));
+    const fromSec = Number(fromRaw);
+    const toSec = Number(toRaw);
 
     const bucketSeconds = RES_TO_SECONDS[resolution];
     if (!bucketSeconds) {
       return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400);
     }
-    if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) {
+    if (
+      !/^\d+$/.test(fromRaw) ||
+      !/^\d+$/.test(toRaw) ||
+      !Number.isSafeInteger(fromSec) ||
+      !Number.isSafeInteger(toSec) ||
+      toSec <= fromSec ||
+      Number.isNaN(new Date(fromSec * 1000).getTime()) ||
+      Number.isNaN(new Date(toSec * 1000).getTime())
+    ) {
       return c.json({ s: "error", errmsg: "Invalid from/to" }, 400);
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const fromSec = parseInt(c.req.query("from") ?? "0", 10);
const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10);
const bucketSeconds = RES_TO_SECONDS[resolution];
if (!bucketSeconds) {
return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400);
}
if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) {
return c.json({ s: "error", errmsg: "Invalid from/to" }, 400);
}
const fromRaw = c.req.query("from") ?? "0";
const toRaw = c.req.query("to") ?? String(Math.floor(Date.now() / 1000));
const fromSec = Number(fromRaw);
const toSec = Number(toRaw);
const bucketSeconds = RES_TO_SECONDS[resolution];
if (!bucketSeconds) {
return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400);
}
if (
!/^\d+$/.test(fromRaw) ||
!/^\d+$/.test(toRaw) ||
!Number.isSafeInteger(fromSec) ||
!Number.isSafeInteger(toSec) ||
toSec <= fromSec ||
Number.isNaN(new Date(fromSec * 1000).getTime()) ||
Number.isNaN(new Date(toSec * 1000).getTime())
) {
return c.json({ s: "error", errmsg: "Invalid from/to" }, 400);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routes/candles.ts` around lines 99 - 108, The current parseInt usage for
fromSec/toSec allows partial matches and very large numbers that later throw
when converted to Date; tighten validation in the request parsing for the
candles route by: validate the raw query strings for "from" and "to" are purely
digits (optional leading - if needed) before parsing, parse with Number or
parseInt only after the regex check to prevent partial parses, ensure the parsed
values are safe integers (use Number.isInteger) and within a sensible Unix
timestamp range (e.g., between 0 and a max like 32503680000) and that toSec >
fromSec; update the logic around the fromSec/toSec variables (and the existing
Number.isFinite check) to return c.json({ s: "error", errmsg: "Invalid from/to"
}, 400) for any malformed or out-of-range inputs so Date construction cannot
throw.


try {
const { data, error } = await getSupabase()
.from("trades")
.select("price, size, created_at")
.eq("slab_address", slab)
.eq("network", getNetwork())
.gte("created_at", new Date(fromSec * 1000).toISOString())
.lte("created_at", new Date(toSec * 1000).toISOString())
.order("created_at", { ascending: true })
.limit(MAX_BARS * 10);
if (error) throw error;
const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
Comment on lines +111 to +121
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid silently truncating candle data.

.limit(MAX_BARS * 10) caps raw trades, not candles. For active slabs, the query returns the oldest 50k trades and drops the rest, producing incorrect close/high/low/volume with s: "ok". Enforce a candle range limit and detect raw-trade truncation instead of returning partial OHLCV.

🐛 Proposed guardrails
+      const requestedBars = Math.ceil((toSec - fromSec) / bucketSeconds);
+      if (requestedBars > MAX_BARS) {
+        return c.json({ s: "error", errmsg: "Requested range exceeds max bars" }, 400);
+      }
+
+      const maxTrades = MAX_BARS * 10;
       const { data, error } = await getSupabase()
         .from("trades")
         .select("price, size, created_at")
         .eq("slab_address", slab)
         .eq("network", getNetwork())
         .gte("created_at", new Date(fromSec * 1000).toISOString())
         .lte("created_at", new Date(toSec * 1000).toISOString())
         .order("created_at", { ascending: true })
-        .limit(MAX_BARS * 10);
+        .limit(maxTrades + 1);
       if (error) throw error;
-      const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
+      if ((data?.length ?? 0) > maxTrades) {
+        return c.json({ s: "error", errmsg: "Too many trades for requested range" }, 400);
+      }
+      const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const { data, error } = await getSupabase()
.from("trades")
.select("price, size, created_at")
.eq("slab_address", slab)
.eq("network", getNetwork())
.gte("created_at", new Date(fromSec * 1000).toISOString())
.lte("created_at", new Date(toSec * 1000).toISOString())
.order("created_at", { ascending: true })
.limit(MAX_BARS * 10);
if (error) throw error;
const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
const requestedBars = Math.ceil((toSec - fromSec) / bucketSeconds);
if (requestedBars > MAX_BARS) {
return c.json({ s: "error", errmsg: "Requested range exceeds max bars" }, 400);
}
const maxTrades = MAX_BARS * 10;
const { data, error } = await getSupabase()
.from("trades")
.select("price, size, created_at")
.eq("slab_address", slab)
.eq("network", getNetwork())
.gte("created_at", new Date(fromSec * 1000).toISOString())
.lte("created_at", new Date(toSec * 1000).toISOString())
.order("created_at", { ascending: true })
.limit(maxTrades + 1);
if (error) throw error;
if ((data?.length ?? 0) > maxTrades) {
return c.json({ s: "error", errmsg: "Too many trades for requested range" }, 400);
}
const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/routes/candles.ts` around lines 111 - 121, The query currently applies
.limit(MAX_BARS * 10) to raw trades which can silently truncate data before
bucketCandles and produce wrong OHLCV; update the logic around
getSupabase().from("trades") so you (1) enforce a requested candle range cap by
validating (toSec - fromSec) <= MAX_BARS * bucketSeconds and return an error if
exceeded, and (2) detect raw-trade truncation by checking if (data?.length ?? 0)
>= the query limit (the value passed to .limit) and fail (or return a clear
truncation/error response) instead of proceeding to bucketCandles; reference
getSupabase, bucketCandles, TradeRow, MAX_BARS, bucketSeconds, slab, and
getNetwork when making these checks so callers never receive partial/incorrect
OHLCV.

return c.json(bars);
} catch (err) {
logger.error("Candles query failed", {
slab,
resolution,
error: truncateErrorMessage(err instanceof Error ? err.message : String(err), 120),
});
return c.json(emptyResponse("error"), 500);
}
});

return app;
}
106 changes: 106 additions & 0 deletions src/services/OraclePriceBroadcaster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* OraclePriceBroadcaster
*
* Bridges cross-process state: the INDEXER (separate service) writes rows to
* Supabase `oracle_prices` on every keeper oracle push. This service subscribes
* to Supabase Realtime INSERT events on that table and fires a LOCAL
* `price.updated` event on the api's `eventBus`. The existing WebSocket handler
* in `routes/ws.ts` picks that up and fans out to clients subscribed to
* `price:<slab>`.
*
* Without this, the api's `eventBus.on("price.updated")` handler waits for an
* event that no in-process emitter fires — so frontends only see new prices on
* page refresh, never live.
*
* REQUIRES the `oracle_prices` table to be added to Supabase's `supabase_realtime`
* publication:
*
* ALTER PUBLICATION supabase_realtime ADD TABLE oracle_prices;
*/
import { eventBus, getSupabase, getNetwork, createLogger } from "@percolator/shared";
import type { RealtimeChannel } from "@supabase/supabase-js";

const logger = createLogger("api:price-broadcaster");

interface OraclePriceRow {
slab_address: string;
price_e6: string | number;
timestamp: number;
tx_signature: string | null;
network: string;
}

export class OraclePriceBroadcaster {
private channel: RealtimeChannel | null = null;
private started = false;

async start(): Promise<void> {
if (this.started) return;
this.started = true;

const network = getNetwork();
try {
const sb = getSupabase();
this.channel = sb
.channel("oracle-prices-broadcaster")
.on(
"postgres_changes",
{
event: "INSERT",
schema: "public",
table: "oracle_prices",
filter: `network=eq.${network}`,
},
(payload) => {
try {
const row = payload.new as OraclePriceRow | undefined;
if (!row || !row.slab_address) return;
const priceE6 = typeof row.price_e6 === "string"
? Number(row.price_e6)
: Number(row.price_e6);
if (!Number.isFinite(priceE6) || priceE6 <= 0) return;

eventBus.publish("price.updated", row.slab_address, {
priceE6,
// For a Hyperp, mark and index converge on the oracle value
// pushed here. The frontend uses whichever the ws.ts handler
// formats into the outbound JSON.
markPriceE6: priceE6,
indexPriceE6: priceE6,
source: "oracle_prices",
tx_signature: row.tx_signature ?? undefined,
});
} catch (err) {
logger.error("oracle_prices insert handler failed", {
error: err instanceof Error ? err.message : String(err),
});
}
},
)
.subscribe((status) => {
if (status === "SUBSCRIBED") {
logger.info("oracle-price broadcaster subscribed", { network });
} else if (status === "CHANNEL_ERROR" || status === "TIMED_OUT") {
logger.error("oracle-price broadcaster channel status", { status, network });
}
Comment on lines +80 to +85
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recover from Realtime subscription failures.

CHANNEL_ERROR and TIMED_OUT only log, while started stays true; a failed channel can leave the live-price bridge permanently inactive until process restart. Reset/remove the channel and retry with backoff, or make start() surface the failure so the caller can handle it.

🔁 Example recovery shape
         .subscribe((status) => {
           if (status === "SUBSCRIBED") {
             logger.info("oracle-price broadcaster subscribed", { network });
           } else if (status === "CHANNEL_ERROR" || status === "TIMED_OUT") {
             logger.error("oracle-price broadcaster channel status", { status, network });
+            this.started = false;
+            const failedChannel = this.channel;
+            this.channel = null;
+            if (failedChannel) {
+              void getSupabase().removeChannel(failedChannel).catch(() => undefined);
+            }
+            setTimeout(() => {
+              this.start().catch((err) => {
+                logger.error("oracle-price broadcaster retry failed", {
+                  error: err instanceof Error ? err.message : String(err),
+                });
+              });
+            }, 5_000).unref?.();
           }
         });

Please verify this against the Supabase JS version in use:

Supabase JavaScript RealtimeChannel subscribe callback status CHANNEL_ERROR TIMED_OUT retry behavior
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/services/OraclePriceBroadcaster.ts` around lines 80 - 85, The
subscription status handler in OraclePriceBroadcaster currently logs
CHANNEL_ERROR and TIMED_OUT but leaves started true and the failed channel
intact; modify the subscribe callback in OraclePriceBroadcaster to mark the
broadcaster as not started (clear the started flag), remove/unsubscribe the
failing RealtimeChannel instance (channel) and initiate a retry loop with
exponential backoff (or surface the error from start()) so callers can handle
failures; reference the start() method, the subscribe callback handling status,
and the channel variable to locate where to clear started, unsubscribe/remove
the channel, and either throw/return an error from start() or schedule a backoff
retry that re-creates the channel and re-subscribes.

});
} catch (err) {
logger.error("failed to start oracle-price broadcaster", {
error: err instanceof Error ? err.message : String(err),
});
this.started = false;
}
}

async stop(): Promise<void> {
if (this.channel) {
try {
await getSupabase().removeChannel(this.channel);
} catch {
/* ignore */
}
this.channel = null;
}
this.started = false;
}
}
Loading
Loading