diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 6fa156b2..24c5de95 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -11,6 +11,7 @@ import type { OrganizationId } from '~/app/types/organization' import { getOrganization } from '~/batch/db/queries' import { createFetcher } from '~/batch/github/fetcher' import { createStore } from '~/batch/github/store' +import { computeAdvancedScanWatermark } from './scan-watermark' export const crawlJob = defineJob({ name: 'crawl', @@ -89,28 +90,29 @@ export const crawlJob = defineJob({ }) } - const lastFetchedAt = await step.run( - `last-fetched-at:${repoLabel}`, - async () => { - if (input.refresh) return FETCH_ALL_SENTINEL - return ( - (await store.getLatestUpdatedAt().catch(() => null)) ?? - FETCH_ALL_SENTINEL - ) - }, - ) + // Watermark bounds full-sweep progress; targeted fetches must not + // advance it (see computeAdvancedScanWatermark / #278). Already + // preloaded via getOrganization, so no extra round-trip here. + const scanWatermark = input.refresh + ? FETCH_ALL_SENTINEL + : (repo.scanWatermark ?? FETCH_ALL_SENTINEL) const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null - const prsToFetch: Array<{ number: number }> = prNumberSet - ? (input.prNumbers?.map((n) => ({ number: n })) ?? []) - : await step.run(`fetch-prs:${repoLabel}`, async () => { - step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`) - const stopBefore = - input.refresh || lastFetchedAt === FETCH_ALL_SENTINEL - ? undefined - : lastFetchedAt - return await fetcher.pullrequestList(stopBefore) - }) + const prsToFetch: Array<{ number: number; updatedAt?: string }> = + prNumberSet + ? (input.prNumbers?.map((n) => ({ number: n })) ?? []) + : await step.run(`fetch-prs:${repoLabel}`, async () => { + step.progress( + i + 1, + repoCount, + `Fetching PR list: ${repoLabel}...`, + ) + const stopBefore = + input.refresh || scanWatermark === FETCH_ALL_SENTINEL + ? undefined + : scanWatermark + return await fetcher.pullrequestList(stopBefore) + }) const repoUpdated = new Set() for (let j = 0; j < prsToFetch.length; j++) { @@ -168,6 +170,19 @@ export const crawlJob = defineJob({ if (repoUpdated.size > 0) { updatedPrNumbers.set(repo.id, repoUpdated) } + + // Advance the scan watermark only after a fully successful full-sweep. + // See computeAdvancedScanWatermark for the invariants. + const nextWatermark = computeAdvancedScanWatermark({ + isTargetedFetch: prNumberSet !== null, + prsToFetch, + savedPrNumbers: repoUpdated, + }) + if (nextWatermark !== null) { + await step.run(`advance-scan-watermark:${repoLabel}`, async () => { + await store.setScanWatermark(nextWatermark) + }) + } } if (!input.refresh && !input.prNumbers && updatedPrNumbers.size === 0) { diff --git a/app/services/jobs/scan-watermark.test.ts b/app/services/jobs/scan-watermark.test.ts new file mode 100644 index 00000000..5d77e590 --- /dev/null +++ b/app/services/jobs/scan-watermark.test.ts @@ -0,0 +1,96 @@ +import { describe, expect, test } from 'vitest' +import { computeAdvancedScanWatermark } from './scan-watermark' + +describe('computeAdvancedScanWatermark', () => { + test('targeted fetch never advances the watermark', () => { + expect( + computeAdvancedScanWatermark({ + isTargetedFetch: true, + prsToFetch: [{ number: 101, updatedAt: '2026-03-16T13:55:00Z' }], + savedPrNumbers: new Set([101]), + }), + ).toBeNull() + }) + + test('empty sweep leaves watermark unchanged', () => { + expect( + computeAdvancedScanWatermark({ + isTargetedFetch: false, + prsToFetch: [], + savedPrNumbers: new Set(), + }), + ).toBeNull() + }) + + test('fully successful full sweep advances to max updatedAt', () => { + expect( + computeAdvancedScanWatermark({ + isTargetedFetch: false, + prsToFetch: [ + { number: 100, updatedAt: '2026-03-16T13:45:00Z' }, + { number: 101, updatedAt: '2026-03-16T13:55:00Z' }, + { number: 99, updatedAt: '2026-03-16T13:30:00Z' }, + ], + savedPrNumbers: new Set([100, 101, 99]), + }), + ).toBe('2026-03-16T13:55:00Z') + }) + + test('advancement requires set membership, not just matching counts', () => { + // If savedPrNumbers ever contains unrelated numbers (e.g. a caller bug + // or concurrent mutation), the function must still refuse to advance. + expect( + computeAdvancedScanWatermark({ + isTargetedFetch: false, + prsToFetch: [ + { number: 100, updatedAt: '2026-03-16T13:45:00Z' }, + { number: 101, updatedAt: '2026-03-16T13:55:00Z' }, + ], + savedPrNumbers: new Set([101, 999]), + }), + ).toBeNull() + }) + + test('partial failure does NOT advance the watermark', () => { + // PR#100 (13:45) failed, PR#101 (13:55) succeeded. If we advanced to + // 13:55 here, the next full crawl would stopBefore=13:55 and skip + // PR#100 forever. See #278. + expect( + computeAdvancedScanWatermark({ + isTargetedFetch: false, + prsToFetch: [ + { number: 100, updatedAt: '2026-03-16T13:45:00Z' }, + { number: 101, updatedAt: '2026-03-16T13:55:00Z' }, + ], + savedPrNumbers: new Set([101]), + }), + ).toBeNull() + }) + + test('gap-recovery scenario: full crawl after targeted webhook fetch picks up the older PR', () => { + // Reproduces the #278 scenario end-to-end at the pure-logic level: + // 1. 13:30 prior full sweep advanced watermark to 13:30 + // 2. 13:45 PR#100 merged (webhook not yet subscribed, so no fetch) + // 3. 13:55 PR#101 comment → webhook targeted fetch saves PR#101 only. + // computeAdvancedScanWatermark returns null → watermark stays 13:30. + // 4. 14:30 scheduled full crawl runs pullrequestList(stopBefore=13:30) + // and sees both PR#101 (13:55) and PR#100 (13:45). Both save. + // Watermark advances to 13:55. + const targeted = computeAdvancedScanWatermark({ + isTargetedFetch: true, + prsToFetch: [{ number: 101, updatedAt: '2026-03-16T13:55:00Z' }], + savedPrNumbers: new Set([101]), + }) + expect(targeted).toBeNull() // watermark still 13:30 + + const fullSweep = computeAdvancedScanWatermark({ + isTargetedFetch: false, + prsToFetch: [ + { number: 101, updatedAt: '2026-03-16T13:55:00Z' }, + { number: 100, updatedAt: '2026-03-16T13:45:00Z' }, + ], + savedPrNumbers: new Set([100, 101]), + }) + expect(fullSweep).toBe('2026-03-16T13:55:00Z') + }) +}) diff --git a/app/services/jobs/scan-watermark.ts b/app/services/jobs/scan-watermark.ts new file mode 100644 index 00000000..226a8ada --- /dev/null +++ b/app/services/jobs/scan-watermark.ts @@ -0,0 +1,39 @@ +/** + * Compute the next scan watermark for a repository after a crawl sweep. + * + * Background (#278): the scan watermark is the PR `updatedAt` upper bound up + * to which a full-sweep crawl has guaranteed every PR has been fetched. The + * next full crawl uses it as `stopBefore` for paginating the PR list. Getting + * this wrong silently drops PRs forever, so the invariants matter: + * + * 1. Targeted (webhook) fetches MUST NOT advance the watermark. They only see + * a specific PR number list, so advancing the watermark past skipped PRs + * would cause the next full sweep to stop short of them. + * 2. Partially-failed sweeps MUST NOT advance the watermark. If any listed PR + * failed to save, the next sweep needs to see it again, which requires + * keeping the watermark at (or below) the failed PR's updatedAt. + * 3. Sweeps that saw zero new PRs leave the watermark as-is (returning null + * from this function means "do not update"). + * + * Returns the new watermark timestamp, or null to signal no update. + */ +export const computeAdvancedScanWatermark = (params: { + isTargetedFetch: boolean + prsToFetch: ReadonlyArray<{ number: number; updatedAt?: string }> + savedPrNumbers: ReadonlySet +}): string | null => { + const { isTargetedFetch, prsToFetch, savedPrNumbers } = params + + if (isTargetedFetch) return null + if (prsToFetch.length === 0) return null + + let max: string | null = null + for (const pr of prsToFetch) { + // Membership (not size): guarantees every listed PR was saved, even if + // savedPrNumbers ever contains unrelated numbers. + if (!savedPrNumbers.has(pr.number)) return null + if (!pr.updatedAt) continue + if (max === null || pr.updatedAt > max) max = pr.updatedAt + } + return max +} diff --git a/app/services/tenant-type.ts b/app/services/tenant-type.ts index 6c6d5bca..819528cc 100644 --- a/app/services/tenant-type.ts +++ b/app/services/tenant-type.ts @@ -143,6 +143,7 @@ export interface Repositories { releaseDetectionKey: Generated; releaseDetectionMethod: Generated<"branch" | "tags">; repo: string; + scanWatermark: string | null; teamId: string | null; updatedAt: string; } diff --git a/batch/db/queries.ts b/batch/db/queries.ts index e0362dda..87853ef0 100644 --- a/batch/db/queries.ts +++ b/batch/db/queries.ts @@ -107,6 +107,7 @@ async function getTenantData(organizationId: OrganizationId) { 'teamId', 'updatedAt', 'createdAt', + 'scanWatermark', ]) .execute(), tenantDb diff --git a/batch/github/store.test.ts b/batch/github/store.test.ts index ea8f1515..e016022f 100644 --- a/batch/github/store.test.ts +++ b/batch/github/store.test.ts @@ -326,15 +326,19 @@ describe('store', () => { expect(await store.loader.commits(2)).toEqual(makeCommits(2)) }) - test('getLatestUpdatedAt returns MAX(updatedAt) without JSON parsing', async () => { + test('scan watermark round-trip: null by default, updated via setScanWatermark', async () => { const store = createStore({ organizationId: orgId, repositoryId }) - const pr1 = { ...makePr(1), updatedAt: '2024-01-02T00:00:00Z' } - const pr2 = { ...makePr(2), updatedAt: '2024-01-05T00:00:00Z' } - const pr3 = { ...makePr(3), updatedAt: '2024-01-03T00:00:00Z' } + expect(await store.getScanWatermark()).toBeNull() + await store.setScanWatermark('2024-01-05T00:00:00Z') + expect(await store.getScanWatermark()).toBe('2024-01-05T00:00:00Z') + + // Watermark lives on repositories.scanWatermark and is independent of + // githubRawData rows; targeted fetches that write raw data must not + // implicitly advance it (see #278). await store.savePrData( - pr1, + { ...makePr(1), updatedAt: '2024-02-10T00:00:00Z' }, { commits: makeCommits(1), reviews: makeReviews(1), @@ -342,31 +346,7 @@ describe('store', () => { }, isoAt(1_700_000_000_000), ) - await store.savePrData( - pr2, - { - commits: makeCommits(2), - reviews: makeReviews(2), - discussions: makeDiscussions(2), - }, - isoAt(1_700_000_000_001), - ) - await store.savePrData( - pr3, - { - commits: makeCommits(3), - reviews: makeReviews(3), - discussions: makeDiscussions(3), - }, - isoAt(1_700_000_000_002), - ) - - expect(await store.getLatestUpdatedAt()).toBe('2024-01-05T00:00:00Z') - }) - - test('getLatestUpdatedAt returns null when no data', async () => { - const store = createStore({ organizationId: orgId, repositoryId }) - expect(await store.getLatestUpdatedAt()).toBeNull() + expect(await store.getScanWatermark()).toBe('2024-01-05T00:00:00Z') }) test('preloadAll uses lazy parsing (data accessible after preload)', async () => { diff --git a/batch/github/store.ts b/batch/github/store.ts index 52882dbd..e98535b2 100644 --- a/batch/github/store.ts +++ b/batch/github/store.ts @@ -270,16 +270,27 @@ export const createStore = ({ } /** - * Get the latest updatedAt timestamp for this repository. - * Uses SQL MAX() — no JSON parsing needed. + * Scan watermark: the upper bound (PR updatedAt) up to which a full-sweep + * crawl has guaranteed to have fetched every PR. Only full-sweep crawls + * should advance this; targeted fetches (webhook-driven) must NOT touch it, + * otherwise older PRs that were updated during a gap get skipped by the + * next full crawl's `stopBefore` check. See issue #278. */ - const getLatestUpdatedAt = async (): Promise => { + const getScanWatermark = async (): Promise => { const row = await db - .selectFrom('githubRawData') - .select((eb) => eb.fn.max('updatedAt').as('maxUpdatedAt')) - .where('repositoryId', '=', repositoryId) + .selectFrom('repositories') + .select('scanWatermark') + .where('id', '=', repositoryId) .executeTakeFirst() - return (row?.maxUpdatedAt as string | null) ?? null + return row?.scanWatermark ?? null + } + + const setScanWatermark = async (watermark: string): Promise => { + await db + .updateTable('repositories') + .set({ scanWatermark: watermark }) + .where('id', '=', repositoryId) + .execute() } return { @@ -287,7 +298,8 @@ export const createStore = ({ updatePrMetadata, saveTags, preloadAll, - getLatestUpdatedAt, + getScanWatermark, + setScanWatermark, loader: { commits, discussions, diff --git a/db/migrations/tenant/20260407004753_add_scan_watermark.sql b/db/migrations/tenant/20260407004753_add_scan_watermark.sql new file mode 100644 index 00000000..8bea9790 --- /dev/null +++ b/db/migrations/tenant/20260407004753_add_scan_watermark.sql @@ -0,0 +1,10 @@ +-- Add column "scan_watermark" to table: "repositories" +ALTER TABLE `repositories` ADD COLUMN `scan_watermark` text NULL; +-- Backfill scan_watermark from existing raw data so next crawl does not re-fetch everything. +-- Historical data was produced by full-sweep crawls, so max(updated_at) is a valid watermark. +UPDATE `repositories` +SET `scan_watermark` = ( + SELECT MAX(`updated_at`) + FROM `github_raw_data` + WHERE `github_raw_data`.`repository_id` = `repositories`.`id` +); diff --git a/db/migrations/tenant/atlas.sum b/db/migrations/tenant/atlas.sum index 70a99476..5bf8d149 100644 --- a/db/migrations/tenant/atlas.sum +++ b/db/migrations/tenant/atlas.sum @@ -1,4 +1,4 @@ -h1:7qLuG3yxMP6dnU1l/Cux8QOBPDTV4aLP+s8tVFmg3XA= +h1:faIWXl0zqPfxS0ANZy6xQifugYWN4RCW8D31sx77twI= 20260226112249_initial_tenant.sql h1:dIhBg2gzyh+ZjLzPXdHYafd5e62yIEjk1eFlllEyYX0= 20260226233619_add_teams.sql h1:n8MRMUA4BgeXYEnL9HJPc8mnXh8lqIfrCcdYtFFoWqw= 20260227163239.sql h1:ENMZUW7zHK8UjG2TdYlBOZSVPPUCXftIw5U5k2C54oo= @@ -21,3 +21,4 @@ h1:7qLuG3yxMP6dnU1l/Cux8QOBPDTV4aLP+s8tVFmg3XA= 20260318120000_migrate_excluded_users_to_bot_type.sql h1:QhLNJld3iqMSd/Zi8/jHadFB+0u4ocStzpprBqCaG+M= 20260319120000_add_last_activity_at.sql h1:2evW+gCdRyH1OvTH2NDh5cV2xaxOHIldHYrkwS2ThE4= 20260324000000_move_integrations_to_shared.sql h1:KsJftEmsruoNtxLi2katRAhETjXx2uTefqV3iX2pVsE= +20260407004753_add_scan_watermark.sql h1:ULts2U5BxtBoi5Qjb2+0o4SrJ3RBg7zcC47xKsMpk6o= diff --git a/db/tenant.sql b/db/tenant.sql index 66555b13..4002d16f 100644 --- a/db/tenant.sql +++ b/db/tenant.sql @@ -32,6 +32,7 @@ CREATE TABLE `repositories` ( `updated_at` datetime NOT NULL, `created_at` datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP), `team_id` text NULL, + `scan_watermark` text NULL, PRIMARY KEY (`id`), CONSTRAINT `repositories_team_id_fkey` FOREIGN KEY (`team_id`) REFERENCES `teams` (`id`) ON UPDATE CASCADE ON DELETE SET NULL );