diff --git a/src/lib/server/services/webhook-events.ts b/src/lib/server/services/webhook-events.ts index f9f4ddf..242b070 100644 --- a/src/lib/server/services/webhook-events.ts +++ b/src/lib/server/services/webhook-events.ts @@ -1,8 +1,10 @@ -import { and, eq, inArray, lt } from "drizzle-orm"; +import { and, asc, eq, inArray, lt } from "drizzle-orm"; import { db } from "../db"; import { webhookEndpoints, webhookEvents } from "../db/schema"; const EXPIRY_DAYS = 7; +export const DEFAULT_POLL_LIMIT = 5; +const MAX_POLL_LIMIT = 25; export async function createEvent( endpointId: string, @@ -26,7 +28,15 @@ export async function createEvent( return row; } -export async function getPendingEvents(tokenId: string, endpointId?: string) { +function normalizePollLimit(limit?: number) { + if (limit === undefined || !Number.isFinite(limit)) { + return DEFAULT_POLL_LIMIT; + } + + return Math.min(Math.max(Math.trunc(limit), 1), MAX_POLL_LIMIT); +} + +export async function getPendingEvents(tokenId: string, endpointId?: string, limit?: number) { const conditions = [ eq(webhookEvents.status, "pending"), eq(webhookEndpoints.tokenId, tokenId), @@ -49,7 +59,9 @@ export async function getPendingEvents(tokenId: string, endpointId?: string) { }) .from(webhookEvents) .innerJoin(webhookEndpoints, eq(webhookEvents.endpointId, webhookEndpoints.id)) - .where(and(...conditions)); + .where(and(...conditions)) + .orderBy(asc(webhookEvents.receivedAt)) + .limit(normalizePollLimit(limit)); } export async function acknowledgeEvents(tokenId: string, eventIds: string[]) { diff --git a/src/routes/webhooks/poll/+server.ts b/src/routes/webhooks/poll/+server.ts index c9d1b41..6aa239a 100644 --- a/src/routes/webhooks/poll/+server.ts +++ b/src/routes/webhooks/poll/+server.ts @@ -6,6 +6,8 @@ import { getPendingEvents } from "$lib/server/services/webhook-events"; export const GET: RequestHandler = async ({ request, url }) => { const token = await requireBearer(request); const endpointId = url.searchParams.get("endpointId") ?? undefined; - const events = await getPendingEvents(token.id, endpointId); + const limitParam = url.searchParams.get("limit"); + const limit = limitParam === null ? undefined : Number(limitParam); + const events = await getPendingEvents(token.id, endpointId, limit); return json({ events }); }; diff --git a/tests/integration/webhook-events.test.ts b/tests/integration/webhook-events.test.ts index f146a1e..d5a6ce4 100644 --- a/tests/integration/webhook-events.test.ts +++ b/tests/integration/webhook-events.test.ts @@ -56,6 +56,42 @@ describe("webhook-events service", () => { expect(events[0].body).toEqual({ source: "linear" }); }); + it("returns the five oldest pending events by default", async () => { + const { token } = await createTestToken(); + const endpoint = await createEndpoint(token.id, { name: "Linear" }); + + for (let i = 0; i < 7; i++) { + const event = await createEvent(endpoint.id, {}, { n: i }); + await db + .update(webhookEvents) + .set({ receivedAt: new Date(`2026-01-01T00:00:0${i}.000Z`) }) + .where(eq(webhookEvents.id, event.id)); + } + + const events = await getPendingEvents(token.id); + + expect(events).toHaveLength(5); + expect(events.map((event) => event.body)).toEqual([ + { n: 0 }, + { n: 1 }, + { n: 2 }, + { n: 3 }, + { n: 4 }, + ]); + }); + + it("allows callers to request a smaller poll batch", async () => { + const { token } = await createTestToken(); + const endpoint = await createEndpoint(token.id, { name: "Linear" }); + await createEvent(endpoint.id, {}, { n: 1 }); + await createEvent(endpoint.id, {}, { n: 2 }); + await createEvent(endpoint.id, {}, { n: 3 }); + + const events = await getPendingEvents(token.id, undefined, 2); + + expect(events).toHaveLength(2); + }); + it("does not return events belonging to another token", async () => { const { token: token1 } = await createTestToken("Agent A"); const { token: token2 } = await createTestToken("Agent B");