Skip to content

Commit 42eeef8

Browse files
committed
Simplify queue spans and fix missing span.end() calls
1 parent 00d5754 commit 42eeef8

File tree

3 files changed

+249
-234
lines changed

3 files changed

+249
-234
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { NextApiRequest, NextApiResponse } from 'next';
2+
import { createClient } from '@supabase/supabase-js';
3+
4+
// These are the default development keys for a local Supabase instance
5+
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
6+
const SUPABASE_SERVICE_ROLE_KEY =
7+
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';
8+
9+
const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY);
10+
11+
// NOTE: Not instrumenting with Sentry intentionally - this is just a cleanup helper
12+
13+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
14+
// Purge all messages from the todos queue by consuming them in a loop
15+
let purgedCount = 0;
16+
const maxIterations = 100; // Safety limit
17+
18+
for (let i = 0; i < maxIterations; i++) {
19+
const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
20+
queue_name: 'todos',
21+
});
22+
23+
if (error) {
24+
return res.status(500).json({ error: error.message, purgedCount });
25+
}
26+
27+
// No more messages to pop
28+
if (!data || (Array.isArray(data) && data.length === 0)) {
29+
break;
30+
}
31+
32+
purgedCount++;
33+
}
34+
35+
return res.status(200).json({ purgedCount });
36+
}

dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,9 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => {
368368
});
369369

370370
test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, baseURL }) => {
371+
// Purge any stale messages from previous tests to ensure we get the message we just produced
372+
await fetch(`${baseURL}/api/queue/purge`);
373+
371374
const producerTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
372375
return Boolean(
373376
transactionEvent?.contexts?.trace?.op === 'http.server' &&
@@ -426,7 +429,7 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
426429
data: expect.objectContaining({
427430
'messaging.destination.name': 'todos',
428431
'messaging.system': 'supabase',
429-
'messaging.message.id': '1',
432+
'messaging.message.id': expect.any(String),
430433
'messaging.operation.type': 'process',
431434
'messaging.operation.name': 'pop',
432435
'messaging.message.body.size': expect.any(Number),
@@ -471,13 +474,16 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
471474
message: 'queue.process(todos)',
472475
data: {
473476
'messaging.destination.name': 'todos',
474-
'messaging.message.id': '1',
477+
'messaging.message.id': expect.any(String),
475478
'messaging.message.body.size': expect.any(Number),
476479
},
477480
});
478481
});
479482

480483
test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
484+
// Purge any stale messages from previous tests to ensure we get the message we just produced
485+
await fetch(`${baseURL}/api/queue/purge`);
486+
481487
const producerTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
482488
return !!(
483489
transactionEvent?.contexts?.trace?.op === 'http.server' &&
@@ -528,15 +534,15 @@ test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
528534
expect(queueMessage.message._sentry).toBeUndefined();
529535

530536
const consumerSpan = transactionEvent.spans?.find(
531-
span => span.op === 'queue.process' && span.data?.['messaging.message.id'] === '2',
537+
span => span.op === 'queue.process' && span.description === 'process todos',
532538
);
533539
expect(consumerSpan).toBeDefined();
534540

535541
expect(consumerSpan).toMatchObject({
536542
data: expect.objectContaining({
537543
'messaging.destination.name': 'todos',
538544
'messaging.system': 'supabase',
539-
'messaging.message.id': '2',
545+
'messaging.message.id': expect.any(String),
540546
'messaging.operation.type': 'process',
541547
'messaging.operation.name': 'pop',
542548
'messaging.message.body.size': expect.any(Number),
@@ -581,7 +587,7 @@ test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
581587
message: 'queue.process(todos)',
582588
data: {
583589
'messaging.destination.name': 'todos',
584-
'messaging.message.id': '2',
590+
'messaging.message.id': expect.any(String),
585591
'messaging.message.body.size': expect.any(Number),
586592
},
587593
});

0 commit comments

Comments
 (0)