fix(api): live-price Supabase Realtime bridge#178
Conversation
Aggregates the trades table into OHLCV bars on-the-fly (bucket in JS). Works on plain Postgres — optional TimescaleDB continuous aggregates can be layered on later for faster queries under high volume. Resolutions: 1 / 5 / 15 / 60 / 240 / 1D (minutes and 1 day). Response shape matches TradingView UDF so frontend can swap data sources. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The api's ws.ts handler subscribed to local eventBus.on('price.updated')
but NOTHING in the api process emits that event. The keeper/indexer emit
'price.updated' in THEIR processes — those are separate Node processes
with separate EventEmitter instances. Net: the frontend's useLivePrice
WebSocket subscription received no live updates; users had to refresh.
New OraclePriceBroadcaster service subscribes to Supabase Realtime INSERT
events on the oracle_prices table and publishes a LOCAL 'price.updated'
event on the api's eventBus with the expected { priceE6, markPriceE6,
indexPriceE6 } shape. The existing ws.ts handler picks it up and fans
out to clients subscribed to 'price:<slab>'.
Requires the oracle_prices table to be in Supabase's 'supabase_realtime'
publication. If not already enabled, run once in Supabase SQL editor:
ALTER PUBLICATION supabase_realtime ADD TABLE oracle_prices;
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughThis pull request introduces a Candles API endpoint for OHLCV aggregation from trade data, adds an OraclePriceBroadcaster service that manages Supabase Realtime subscriptions for oracle price updates with local event publishing, and integrates both into the server startup sequence with proper error handling and testing. Changes
Sequence DiagramsequenceDiagram
participant Server as Server
participant Supabase as Supabase Realtime
participant OraclePriceBroadcaster as OraclePriceBroadcaster
participant EventBus as EventBus
participant Service as Other Services
Server->>OraclePriceBroadcaster: new() instantiate
Server->>OraclePriceBroadcaster: start() async
OraclePriceBroadcaster->>Supabase: subscribe to oracle_prices channel
Supabase-->>OraclePriceBroadcaster: SUBSCRIBED event
OraclePriceBroadcaster->>OraclePriceBroadcaster: log success, set started=true
loop Oracle Price Updates
Supabase-->>OraclePriceBroadcaster: MESSAGE (INSERT row)
OraclePriceBroadcaster->>OraclePriceBroadcaster: validate slab_address & price_e6
OraclePriceBroadcaster->>EventBus: publish price.updated event
EventBus-->>Service: deliver event to subscribers
end
alt Error Scenarios
Supabase-->>OraclePriceBroadcaster: CHANNEL_ERROR or TIMED_OUT
OraclePriceBroadcaster->>OraclePriceBroadcaster: log error
end
Server->>OraclePriceBroadcaster: stop() (on shutdown)
OraclePriceBroadcaster->>Supabase: remove channel subscription
OraclePriceBroadcaster->>OraclePriceBroadcaster: reset state
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
src/index.ts (1)
329-337: Wire the broadcaster into graceful shutdown.
OraclePriceBroadcaster.stop()is implemented but never called, so the Supabase Realtime channel can keep receiving events while HTTP/WS shutdown is in progress. Add it to the shutdown cleanup path before closing the servers.♻️ Proposed cleanup wiring
// Clean up pending price update timers before closing connections + await oraclePriceBroadcaster.stop(); cleanupPriceUpdateTimers();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/index.ts` around lines 329 - 337, The OraclePriceBroadcaster is started but its stop() is never called during shutdown; update the graceful shutdown sequence to call oraclePriceBroadcaster.stop() (or await it if it returns a promise) before closing HTTP/WS servers so the Supabase Realtime channel is unsubscribed prior to server close; locate the broadcaster instance (OraclePriceBroadcaster, oraclePriceBroadcaster) and add the stop invocation in the existing shutdown/cleanup handler(s) that currently close servers so stop() runs first and errors are handled/logged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/routes/candles.ts`:
- Around line 99-108: The current parseInt usage for fromSec/toSec allows
partial matches and very large numbers that later throw when converted to Date;
tighten validation in the request parsing for the candles route by: validate the
raw query strings for "from" and "to" are purely digits (optional leading - if
needed) before parsing, parse with Number or parseInt only after the regex check
to prevent partial parses, ensure the parsed values are safe integers (use
Number.isInteger) and within a sensible Unix timestamp range (e.g., between 0
and a max like 32503680000) and that toSec > fromSec; update the logic around
the fromSec/toSec variables (and the existing Number.isFinite check) to return
c.json({ s: "error", errmsg: "Invalid from/to" }, 400) for any malformed or
out-of-range inputs so Date construction cannot throw.
- Around line 111-121: The query currently applies .limit(MAX_BARS * 10) to raw
trades which can silently truncate data before bucketCandles and produce wrong
OHLCV; update the logic around getSupabase().from("trades") so you (1) enforce a
requested candle range cap by validating (toSec - fromSec) <= MAX_BARS *
bucketSeconds and return an error if exceeded, and (2) detect raw-trade
truncation by checking if (data?.length ?? 0) >= the query limit (the value
passed to .limit) and fail (or return a clear truncation/error response) instead
of proceeding to bucketCandles; reference getSupabase, bucketCandles, TradeRow,
MAX_BARS, bucketSeconds, slab, and getNetwork when making these checks so
callers never receive partial/incorrect OHLCV.
- Around line 79-80: After filtering out rows with non-finite price/size you
must return a UDF "no_data" response instead of an empty "ok" payload; update
the logic around the construction of out (the UdfResponse with s, t, o, h, l, c,
v) that follows sortedKeys = [...bars.keys()].sort((a, b) => a - b) to detect
when no valid rows were pushed and set out.s = "no_data" (and ensure t and other
arrays remain empty) before returning; adjust the return path where out is
returned so the client receives { s: "no_data" } when every row was skipped.
In `@src/services/OraclePriceBroadcaster.ts`:
- Around line 80-85: The subscription status handler in OraclePriceBroadcaster
currently logs CHANNEL_ERROR and TIMED_OUT but leaves started true and the
failed channel intact; modify the subscribe callback in OraclePriceBroadcaster
to mark the broadcaster as not started (clear the started flag),
remove/unsubscribe the failing RealtimeChannel instance (channel) and initiate a
retry loop with exponential backoff (or surface the error from start()) so
callers can handle failures; reference the start() method, the subscribe
callback handling status, and the channel variable to locate where to clear
started, unsubscribe/remove the channel, and either throw/return an error from
start() or schedule a backoff retry that re-creates the channel and
re-subscribes.
---
Nitpick comments:
In `@src/index.ts`:
- Around line 329-337: The OraclePriceBroadcaster is started but its stop() is
never called during shutdown; update the graceful shutdown sequence to call
oraclePriceBroadcaster.stop() (or await it if it returns a promise) before
closing HTTP/WS servers so the Supabase Realtime channel is unsubscribed prior
to server close; locate the broadcaster instance (OraclePriceBroadcaster,
oraclePriceBroadcaster) and add the stop invocation in the existing
shutdown/cleanup handler(s) that currently close servers so stop() runs first
and errors are handled/logged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fb55208a-a0a6-44f7-9fc8-586683d6e7e9
📒 Files selected for processing (4)
src/index.tssrc/routes/candles.tssrc/services/OraclePriceBroadcaster.tstests/routes/candles.test.ts
| const sortedKeys = [...bars.keys()].sort((a, b) => a - b); | ||
| const out: UdfResponse = { s: "ok", t: [], o: [], h: [], l: [], c: [], v: [] }; |
There was a problem hiding this comment.
Return no_data after filtering all invalid trades.
If every row is skipped for non-finite price/size, this currently returns { s: "ok", t: [] }, which is inconsistent UDF output.
🐛 Proposed fix
const sortedKeys = [...bars.keys()].sort((a, b) => a - b);
+ if (sortedKeys.length === 0) return emptyResponse("no_data");
const out: UdfResponse = { s: "ok", t: [], o: [], h: [], l: [], c: [], v: [] };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/routes/candles.ts` around lines 79 - 80, After filtering out rows with
non-finite price/size you must return a UDF "no_data" response instead of an
empty "ok" payload; update the logic around the construction of out (the
UdfResponse with s, t, o, h, l, c, v) that follows sortedKeys =
[...bars.keys()].sort((a, b) => a - b) to detect when no valid rows were pushed
and set out.s = "no_data" (and ensure t and other arrays remain empty) before
returning; adjust the return path where out is returned so the client receives {
s: "no_data" } when every row was skipped.
| const fromSec = parseInt(c.req.query("from") ?? "0", 10); | ||
| const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10); | ||
|
|
||
| const bucketSeconds = RES_TO_SECONDS[resolution]; | ||
| if (!bucketSeconds) { | ||
| return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400); | ||
| } | ||
| if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) { | ||
| return c.json({ s: "error", errmsg: "Invalid from/to" }, 400); | ||
| } |
There was a problem hiding this comment.
Reject malformed timestamps before building Dates.
parseInt accepts partial input like 123abc, and very large finite values can pass this check then throw in toISOString(), turning a bad request into a 500. Parse strictly and validate the Date range up front.
🛡️ Proposed validation tightening
- const fromSec = parseInt(c.req.query("from") ?? "0", 10);
- const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10);
+ const fromRaw = c.req.query("from") ?? "0";
+ const toRaw = c.req.query("to") ?? String(Math.floor(Date.now() / 1000));
+ const fromSec = Number(fromRaw);
+ const toSec = Number(toRaw);
const bucketSeconds = RES_TO_SECONDS[resolution];
if (!bucketSeconds) {
return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400);
}
- if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) {
+ if (
+ !/^\d+$/.test(fromRaw) ||
+ !/^\d+$/.test(toRaw) ||
+ !Number.isSafeInteger(fromSec) ||
+ !Number.isSafeInteger(toSec) ||
+ toSec <= fromSec ||
+ Number.isNaN(new Date(fromSec * 1000).getTime()) ||
+ Number.isNaN(new Date(toSec * 1000).getTime())
+ ) {
return c.json({ s: "error", errmsg: "Invalid from/to" }, 400);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const fromSec = parseInt(c.req.query("from") ?? "0", 10); | |
| const toSec = parseInt(c.req.query("to") ?? String(Math.floor(Date.now() / 1000)), 10); | |
| const bucketSeconds = RES_TO_SECONDS[resolution]; | |
| if (!bucketSeconds) { | |
| return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400); | |
| } | |
| if (!Number.isFinite(fromSec) || !Number.isFinite(toSec) || toSec <= fromSec) { | |
| return c.json({ s: "error", errmsg: "Invalid from/to" }, 400); | |
| } | |
| const fromRaw = c.req.query("from") ?? "0"; | |
| const toRaw = c.req.query("to") ?? String(Math.floor(Date.now() / 1000)); | |
| const fromSec = Number(fromRaw); | |
| const toSec = Number(toRaw); | |
| const bucketSeconds = RES_TO_SECONDS[resolution]; | |
| if (!bucketSeconds) { | |
| return c.json({ s: "error", errmsg: `Unsupported resolution '${resolution}'` }, 400); | |
| } | |
| if ( | |
| !/^\d+$/.test(fromRaw) || | |
| !/^\d+$/.test(toRaw) || | |
| !Number.isSafeInteger(fromSec) || | |
| !Number.isSafeInteger(toSec) || | |
| toSec <= fromSec || | |
| Number.isNaN(new Date(fromSec * 1000).getTime()) || | |
| Number.isNaN(new Date(toSec * 1000).getTime()) | |
| ) { | |
| return c.json({ s: "error", errmsg: "Invalid from/to" }, 400); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/routes/candles.ts` around lines 99 - 108, The current parseInt usage for
fromSec/toSec allows partial matches and very large numbers that later throw
when converted to Date; tighten validation in the request parsing for the
candles route by: validate the raw query strings for "from" and "to" are purely
digits (optional leading - if needed) before parsing, parse with Number or
parseInt only after the regex check to prevent partial parses, ensure the parsed
values are safe integers (use Number.isInteger) and within a sensible Unix
timestamp range (e.g., between 0 and a max like 32503680000) and that toSec >
fromSec; update the logic around the fromSec/toSec variables (and the existing
Number.isFinite check) to return c.json({ s: "error", errmsg: "Invalid from/to"
}, 400) for any malformed or out-of-range inputs so Date construction cannot
throw.
| const { data, error } = await getSupabase() | ||
| .from("trades") | ||
| .select("price, size, created_at") | ||
| .eq("slab_address", slab) | ||
| .eq("network", getNetwork()) | ||
| .gte("created_at", new Date(fromSec * 1000).toISOString()) | ||
| .lte("created_at", new Date(toSec * 1000).toISOString()) | ||
| .order("created_at", { ascending: true }) | ||
| .limit(MAX_BARS * 10); | ||
| if (error) throw error; | ||
| const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds); |
There was a problem hiding this comment.
Avoid silently truncating candle data.
.limit(MAX_BARS * 10) caps raw trades, not candles. For active slabs, the query returns the oldest 50k trades and drops the rest, producing incorrect close/high/low/volume with s: "ok". Enforce a candle range limit and detect raw-trade truncation instead of returning partial OHLCV.
🐛 Proposed guardrails
+ const requestedBars = Math.ceil((toSec - fromSec) / bucketSeconds);
+ if (requestedBars > MAX_BARS) {
+ return c.json({ s: "error", errmsg: "Requested range exceeds max bars" }, 400);
+ }
+
+ const maxTrades = MAX_BARS * 10;
const { data, error } = await getSupabase()
.from("trades")
.select("price, size, created_at")
.eq("slab_address", slab)
.eq("network", getNetwork())
.gte("created_at", new Date(fromSec * 1000).toISOString())
.lte("created_at", new Date(toSec * 1000).toISOString())
.order("created_at", { ascending: true })
- .limit(MAX_BARS * 10);
+ .limit(maxTrades + 1);
if (error) throw error;
- const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);
+ if ((data?.length ?? 0) > maxTrades) {
+ return c.json({ s: "error", errmsg: "Too many trades for requested range" }, 400);
+ }
+ const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const { data, error } = await getSupabase() | |
| .from("trades") | |
| .select("price, size, created_at") | |
| .eq("slab_address", slab) | |
| .eq("network", getNetwork()) | |
| .gte("created_at", new Date(fromSec * 1000).toISOString()) | |
| .lte("created_at", new Date(toSec * 1000).toISOString()) | |
| .order("created_at", { ascending: true }) | |
| .limit(MAX_BARS * 10); | |
| if (error) throw error; | |
| const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds); | |
| const requestedBars = Math.ceil((toSec - fromSec) / bucketSeconds); | |
| if (requestedBars > MAX_BARS) { | |
| return c.json({ s: "error", errmsg: "Requested range exceeds max bars" }, 400); | |
| } | |
| const maxTrades = MAX_BARS * 10; | |
| const { data, error } = await getSupabase() | |
| .from("trades") | |
| .select("price, size, created_at") | |
| .eq("slab_address", slab) | |
| .eq("network", getNetwork()) | |
| .gte("created_at", new Date(fromSec * 1000).toISOString()) | |
| .lte("created_at", new Date(toSec * 1000).toISOString()) | |
| .order("created_at", { ascending: true }) | |
| .limit(maxTrades + 1); | |
| if (error) throw error; | |
| if ((data?.length ?? 0) > maxTrades) { | |
| return c.json({ s: "error", errmsg: "Too many trades for requested range" }, 400); | |
| } | |
| const bars = bucketCandles((data ?? []) as TradeRow[], bucketSeconds); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/routes/candles.ts` around lines 111 - 121, The query currently applies
.limit(MAX_BARS * 10) to raw trades which can silently truncate data before
bucketCandles and produce wrong OHLCV; update the logic around
getSupabase().from("trades") so you (1) enforce a requested candle range cap by
validating (toSec - fromSec) <= MAX_BARS * bucketSeconds and return an error if
exceeded, and (2) detect raw-trade truncation by checking if (data?.length ?? 0)
>= the query limit (the value passed to .limit) and fail (or return a clear
truncation/error response) instead of proceeding to bucketCandles; reference
getSupabase, bucketCandles, TradeRow, MAX_BARS, bucketSeconds, slab, and
getNetwork when making these checks so callers never receive partial/incorrect
OHLCV.
| .subscribe((status) => { | ||
| if (status === "SUBSCRIBED") { | ||
| logger.info("oracle-price broadcaster subscribed", { network }); | ||
| } else if (status === "CHANNEL_ERROR" || status === "TIMED_OUT") { | ||
| logger.error("oracle-price broadcaster channel status", { status, network }); | ||
| } |
There was a problem hiding this comment.
Recover from Realtime subscription failures.
CHANNEL_ERROR and TIMED_OUT only log, while started stays true; a failed channel can leave the live-price bridge permanently inactive until process restart. Reset/remove the channel and retry with backoff, or make start() surface the failure so the caller can handle it.
🔁 Example recovery shape
.subscribe((status) => {
if (status === "SUBSCRIBED") {
logger.info("oracle-price broadcaster subscribed", { network });
} else if (status === "CHANNEL_ERROR" || status === "TIMED_OUT") {
logger.error("oracle-price broadcaster channel status", { status, network });
+ this.started = false;
+ const failedChannel = this.channel;
+ this.channel = null;
+ if (failedChannel) {
+ void getSupabase().removeChannel(failedChannel).catch(() => undefined);
+ }
+ setTimeout(() => {
+ this.start().catch((err) => {
+ logger.error("oracle-price broadcaster retry failed", {
+ error: err instanceof Error ? err.message : String(err),
+ });
+ });
+ }, 5_000).unref?.();
}
});Please verify this against the Supabase JS version in use:
Supabase JavaScript RealtimeChannel subscribe callback status CHANNEL_ERROR TIMED_OUT retry behavior
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/services/OraclePriceBroadcaster.ts` around lines 80 - 85, The
subscription status handler in OraclePriceBroadcaster currently logs
CHANNEL_ERROR and TIMED_OUT but leaves started true and the failed channel
intact; modify the subscribe callback in OraclePriceBroadcaster to mark the
broadcaster as not started (clear the started flag), remove/unsubscribe the
failing RealtimeChannel instance (channel) and initiate a retry loop with
exponential backoff (or surface the error from start()) so callers can handle
failures; reference the start() method, the subscribe callback handling status,
and the channel variable to locate where to clear started, unsubscribe/remove
the channel, and either throw/return an error from start() or schedule a backoff
retry that re-creates the channel and re-subscribes.
Summary
Frontend price wasn't updating without a page refresh. Root cause: the `eventBus` in `@percolator/shared` is a plain Node EventEmitter — process-local. The keeper / indexer emit `price.updated` in their own processes; those events never cross to the api process, so the WS handler at `ws.ts:461` waited forever for an event no local emitter fires.
Fix
New `OraclePriceBroadcaster` service subscribes to Supabase Realtime INSERT events on the `oracle_prices` table. On each insert it publishes a local `price.updated` event with the `{ priceE6, markPriceE6, indexPriceE6 }` shape the existing ws.ts handler already knows how to format into the outbound `{type:"price", slab, price}` JSON that `useLivePrice` consumes.
Cadence matches our new oracle_prices density (~5s via EventStreamService + 1min via StatsCollector backup). Sub-second WS propagation once subscribed.
One-time setup — required
In Supabase SQL editor:
```sql
ALTER PUBLICATION supabase_realtime ADD TABLE oracle_prices;
```
If the table isn't in the `supabase_realtime` publication, Realtime INSERT events won't fire and the bridge is a no-op.
Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests