Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f7c3918
feat(core): Add Supabase Queues support
onurtemizkan Mar 28, 2025
359783d
Skip tests on bundles
onurtemizkan Apr 22, 2025
483e965
Update test usage
onurtemizkan Apr 22, 2025
1680b7d
Lint
onurtemizkan May 13, 2025
fc78519
Update implementation
onurtemizkan May 27, 2025
5533ae1
Update playwright tests
onurtemizkan May 27, 2025
84e22ee
Tidy up test endpoints
onurtemizkan May 28, 2025
11b06a3
Refactor / reimplement
onurtemizkan May 31, 2025
6fce369
Add missing import
onurtemizkan Jun 20, 2025
1705876
Rename `SupabaseClientConstructor` to `SupabaseClientConstructorType`
onurtemizkan Jun 27, 2025
9b458f6
Extract `instrumentRpcMethod`
onurtemizkan Jun 27, 2025
f3a22a0
WIP
onurtemizkan Aug 19, 2025
852a236
Match with OTEL semantics improve tests
onurtemizkan Nov 4, 2025
157a39c
Add OTEL operation attributes to queue spans
onurtemizkan Nov 5, 2025
29ee5a0
Fix browser integration tests
onurtemizkan Nov 5, 2025
068661b
Improve data integrity and distributed tracing
onurtemizkan Nov 21, 2025
1992f4d
Use span links for consumer distributed tracing, fix prototype instru…
onurtemizkan Nov 26, 2025
f795cb3
Fix mutation and safety issues, ignore, empty queue consumers
onurtemizkan Dec 3, 2025
dc33d2c
Simplify queue spans and fix missing span.end() calls
onurtemizkan Dec 5, 2025
96e41a5
Re-address review comments
onurtemizkan Dec 6, 2025
7486518
Address review comments
onurtemizkan Dec 7, 2025
36ecce5
Fix linter
onurtemizkan Dec 8, 2025
1511cf2
Add array check for `messages` from params
onurtemizkan Dec 8, 2025
e957e7f
Remove rethrowing errors
onurtemizkan Dec 8, 2025
8298d8d
Use OTEL-compliant `messaging.batch.message_count` attribute.
onurtemizkan Dec 9, 2025
608b31d
Add missing telemetry for empty Supabase queue consumer responses
onurtemizkan Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.rpc('send', {
queue_name: 'todos',
message: { title: 'Test Todo' },
});

await supabaseClient.rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { Page } from '@playwright/test';
import { expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'publish todos',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase.queue.producer',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'process todos',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase.queue.consumer',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.schema('pgmq_public').rpc('send', {
queue_name: 'todos',
message: { title: 'Test Todo' },
});

await supabaseClient.schema('pgmq_public').rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { Page } from '@playwright/test';
import { expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);

const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'publish todos',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase.queue.producer',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'process todos',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase.queue.consumer',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"build": "next build",
"start": "next start",
"clean": "npx rimraf node_modules pnpm-lock.yaml .next",
"start-local-supabase": "supabase init --force --workdir . && supabase start -o env && supabase db reset",
"start-local-supabase": "supabase start -o env && supabase db reset",
"test:prod": "TEST_ENV=production playwright test",
"test:build": "pnpm install && pnpm start-local-supabase && pnpm build",
"test:assert": "pnpm test:prod"
Expand All @@ -25,7 +25,7 @@
"next": "14.2.25",
"react": "18.2.0",
"react-dom": "18.2.0",
"supabase": "2.19.7",
"supabase": "2.23.4",
"typescript": "4.9.5"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
db: {
schema: 'pgmq_public',
},
});

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Step 1: Batch produce multiple messages
const { data: sendData, error: sendError } = await supabaseClient.rpc('send_batch', {
queue_name: 'batch-flow-queue',
messages: [
{
taskType: 'email',
recipient: 'user1@example.com',
subject: 'Welcome!',
},
{
taskType: 'email',
recipient: 'user2@example.com',
subject: 'Verification',
},
{
taskType: 'sms',
recipient: '+1234567890',
message: 'Your code is 123456',
},
],
});

if (sendError) {
return res.status(500).json({ error: `Send batch failed: ${sendError.message}` });
}

// Step 2: Consume multiple messages from the queue
const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', {
queue_name: 'batch-flow-queue',
vt: 30,
qty: 3,
});

if (receiveError) {
return res.status(500).json({ error: `Receive failed: ${receiveError.message}` });
}

// Step 3: Process all messages
const processedMessages = receiveData?.map((msg: any) => ({
messageId: msg.msg_id,
taskType: msg.message?.taskType,
processed: true,
}));

// Step 4: Archive all processed messages
const messageIds = receiveData?.map((msg: any) => msg.msg_id).filter(Boolean);
if (messageIds && messageIds.length > 0) {
const { error: archiveError } = await supabaseClient.rpc('archive', {
queue_name: 'batch-flow-queue',
msg_ids: messageIds,
});

if (archiveError) {
return res.status(500).json({ error: `Archive failed: ${archiveError.message}` });
}
}

return res.status(200).json({
success: true,
batchSize: 3,
produced: { messageIds: sendData },
consumed: {
count: receiveData?.length || 0,
messages: processedMessages,
},
});
}
Loading
Loading