Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions app/services/jobs/crawl.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { OrganizationId } from '~/app/types/organization'
import { getOrganization } from '~/batch/db/queries'
import { createFetcher } from '~/batch/github/fetcher'
import { createStore } from '~/batch/github/store'
import { computeAdvancedScanWatermark } from './scan-watermark'

export const crawlJob = defineJob({
name: 'crawl',
Expand Down Expand Up @@ -89,28 +90,29 @@ export const crawlJob = defineJob({
})
}

const lastFetchedAt = await step.run(
`last-fetched-at:${repoLabel}`,
async () => {
if (input.refresh) return FETCH_ALL_SENTINEL
return (
(await store.getLatestUpdatedAt().catch(() => null)) ??
FETCH_ALL_SENTINEL
)
},
)
// Watermark bounds full-sweep progress; targeted fetches must not
// advance it (see computeAdvancedScanWatermark / #278). Already
// preloaded via getOrganization, so no extra round-trip here.
const scanWatermark = input.refresh
? FETCH_ALL_SENTINEL
: (repo.scanWatermark ?? FETCH_ALL_SENTINEL)

const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null
const prsToFetch: Array<{ number: number }> = prNumberSet
? (input.prNumbers?.map((n) => ({ number: n })) ?? [])
: await step.run(`fetch-prs:${repoLabel}`, async () => {
step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`)
const stopBefore =
input.refresh || lastFetchedAt === FETCH_ALL_SENTINEL
? undefined
: lastFetchedAt
return await fetcher.pullrequestList(stopBefore)
})
const prsToFetch: Array<{ number: number; updatedAt?: string }> =
prNumberSet
? (input.prNumbers?.map((n) => ({ number: n })) ?? [])
: await step.run(`fetch-prs:${repoLabel}`, async () => {
step.progress(
i + 1,
repoCount,
`Fetching PR list: ${repoLabel}...`,
)
const stopBefore =
input.refresh || scanWatermark === FETCH_ALL_SENTINEL
? undefined
: scanWatermark
return await fetcher.pullrequestList(stopBefore)
})

const repoUpdated = new Set<number>()
for (let j = 0; j < prsToFetch.length; j++) {
Expand Down Expand Up @@ -168,6 +170,19 @@ export const crawlJob = defineJob({
if (repoUpdated.size > 0) {
updatedPrNumbers.set(repo.id, repoUpdated)
}

// Advance the scan watermark only after a fully successful full-sweep.
// See computeAdvancedScanWatermark for the invariants.
const nextWatermark = computeAdvancedScanWatermark({
isTargetedFetch: prNumberSet !== null,
prsToFetch,
savedPrNumbers: repoUpdated,
})
if (nextWatermark !== null) {
await step.run(`advance-scan-watermark:${repoLabel}`, async () => {
await store.setScanWatermark(nextWatermark)
})
}
}

if (!input.refresh && !input.prNumbers && updatedPrNumbers.size === 0) {
Expand Down
96 changes: 96 additions & 0 deletions app/services/jobs/scan-watermark.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { describe, expect, test } from 'vitest'
import { computeAdvancedScanWatermark } from './scan-watermark'

describe('computeAdvancedScanWatermark', () => {
test('targeted fetch never advances the watermark', () => {
expect(
computeAdvancedScanWatermark({
isTargetedFetch: true,
prsToFetch: [{ number: 101, updatedAt: '2026-03-16T13:55:00Z' }],
savedPrNumbers: new Set([101]),
}),
).toBeNull()
})

test('empty sweep leaves watermark unchanged', () => {
expect(
computeAdvancedScanWatermark({
isTargetedFetch: false,
prsToFetch: [],
savedPrNumbers: new Set(),
}),
).toBeNull()
})

test('fully successful full sweep advances to max updatedAt', () => {
expect(
computeAdvancedScanWatermark({
isTargetedFetch: false,
prsToFetch: [
{ number: 100, updatedAt: '2026-03-16T13:45:00Z' },
{ number: 101, updatedAt: '2026-03-16T13:55:00Z' },
{ number: 99, updatedAt: '2026-03-16T13:30:00Z' },
],
savedPrNumbers: new Set([100, 101, 99]),
}),
).toBe('2026-03-16T13:55:00Z')
})

test('advancement requires set membership, not just matching counts', () => {
// If savedPrNumbers ever contains unrelated numbers (e.g. a caller bug
// or concurrent mutation), the function must still refuse to advance.
expect(
computeAdvancedScanWatermark({
isTargetedFetch: false,
prsToFetch: [
{ number: 100, updatedAt: '2026-03-16T13:45:00Z' },
{ number: 101, updatedAt: '2026-03-16T13:55:00Z' },
],
savedPrNumbers: new Set([101, 999]),
}),
).toBeNull()
})

test('partial failure does NOT advance the watermark', () => {
// PR#100 (13:45) failed, PR#101 (13:55) succeeded. If we advanced to
// 13:55 here, the next full crawl would stopBefore=13:55 and skip
// PR#100 forever. See #278.
expect(
computeAdvancedScanWatermark({
isTargetedFetch: false,
prsToFetch: [
{ number: 100, updatedAt: '2026-03-16T13:45:00Z' },
{ number: 101, updatedAt: '2026-03-16T13:55:00Z' },
],
savedPrNumbers: new Set([101]),
}),
).toBeNull()
})

test('gap-recovery scenario: full crawl after targeted webhook fetch picks up the older PR', () => {
// Reproduces the #278 scenario end-to-end at the pure-logic level:
// 1. 13:30 prior full sweep advanced watermark to 13:30
// 2. 13:45 PR#100 merged (webhook not yet subscribed, so no fetch)
// 3. 13:55 PR#101 comment → webhook targeted fetch saves PR#101 only.
// computeAdvancedScanWatermark returns null → watermark stays 13:30.
// 4. 14:30 scheduled full crawl runs pullrequestList(stopBefore=13:30)
// and sees both PR#101 (13:55) and PR#100 (13:45). Both save.
// Watermark advances to 13:55.
const targeted = computeAdvancedScanWatermark({
isTargetedFetch: true,
prsToFetch: [{ number: 101, updatedAt: '2026-03-16T13:55:00Z' }],
savedPrNumbers: new Set([101]),
})
expect(targeted).toBeNull() // watermark still 13:30

const fullSweep = computeAdvancedScanWatermark({
isTargetedFetch: false,
prsToFetch: [
{ number: 101, updatedAt: '2026-03-16T13:55:00Z' },
{ number: 100, updatedAt: '2026-03-16T13:45:00Z' },
],
savedPrNumbers: new Set([100, 101]),
})
expect(fullSweep).toBe('2026-03-16T13:55:00Z')
})
})
39 changes: 39 additions & 0 deletions app/services/jobs/scan-watermark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Compute the next scan watermark for a repository after a crawl sweep.
*
* Background (#278): the scan watermark is the PR `updatedAt` upper bound up
* to which a full-sweep crawl has guaranteed every PR has been fetched. The
* next full crawl uses it as `stopBefore` for paginating the PR list. Getting
* this wrong silently drops PRs forever, so the invariants matter:
*
* 1. Targeted (webhook) fetches MUST NOT advance the watermark. They only see
* a specific PR number list, so advancing the watermark past skipped PRs
* would cause the next full sweep to stop short of them.
* 2. Partially-failed sweeps MUST NOT advance the watermark. If any listed PR
* failed to save, the next sweep needs to see it again, which requires
* keeping the watermark at (or below) the failed PR's updatedAt.
* 3. Sweeps that saw zero new PRs leave the watermark as-is (returning null
* from this function means "do not update").
*
* Returns the new watermark timestamp, or null to signal no update.
*/
export const computeAdvancedScanWatermark = (params: {
isTargetedFetch: boolean
prsToFetch: ReadonlyArray<{ number: number; updatedAt?: string }>
savedPrNumbers: ReadonlySet<number>
}): string | null => {
const { isTargetedFetch, prsToFetch, savedPrNumbers } = params

if (isTargetedFetch) return null
if (prsToFetch.length === 0) return null

let max: string | null = null
for (const pr of prsToFetch) {
// Membership (not size): guarantees every listed PR was saved, even if
// savedPrNumbers ever contains unrelated numbers.
if (!savedPrNumbers.has(pr.number)) return null
if (!pr.updatedAt) continue
if (max === null || pr.updatedAt > max) max = pr.updatedAt
}
return max
}
1 change: 1 addition & 0 deletions app/services/tenant-type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export interface Repositories {
releaseDetectionKey: Generated<string>;
releaseDetectionMethod: Generated<"branch" | "tags">;
repo: string;
scanWatermark: string | null;
teamId: string | null;
updatedAt: string;
}
Expand Down
1 change: 1 addition & 0 deletions batch/db/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async function getTenantData(organizationId: OrganizationId) {
'teamId',
'updatedAt',
'createdAt',
'scanWatermark',
])
.execute(),
tenantDb
Expand Down
40 changes: 10 additions & 30 deletions batch/github/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,47 +326,27 @@ describe('store', () => {
expect(await store.loader.commits(2)).toEqual(makeCommits(2))
})

test('getLatestUpdatedAt returns MAX(updatedAt) without JSON parsing', async () => {
test('scan watermark round-trip: null by default, updated via setScanWatermark', async () => {
const store = createStore({ organizationId: orgId, repositoryId })

const pr1 = { ...makePr(1), updatedAt: '2024-01-02T00:00:00Z' }
const pr2 = { ...makePr(2), updatedAt: '2024-01-05T00:00:00Z' }
const pr3 = { ...makePr(3), updatedAt: '2024-01-03T00:00:00Z' }
expect(await store.getScanWatermark()).toBeNull()

await store.setScanWatermark('2024-01-05T00:00:00Z')
expect(await store.getScanWatermark()).toBe('2024-01-05T00:00:00Z')

// Watermark lives on repositories.scanWatermark and is independent of
// githubRawData rows; targeted fetches that write raw data must not
// implicitly advance it (see #278).
await store.savePrData(
pr1,
{ ...makePr(1), updatedAt: '2024-02-10T00:00:00Z' },
{
commits: makeCommits(1),
reviews: makeReviews(1),
discussions: makeDiscussions(1),
},
isoAt(1_700_000_000_000),
)
await store.savePrData(
pr2,
{
commits: makeCommits(2),
reviews: makeReviews(2),
discussions: makeDiscussions(2),
},
isoAt(1_700_000_000_001),
)
await store.savePrData(
pr3,
{
commits: makeCommits(3),
reviews: makeReviews(3),
discussions: makeDiscussions(3),
},
isoAt(1_700_000_000_002),
)

expect(await store.getLatestUpdatedAt()).toBe('2024-01-05T00:00:00Z')
})

test('getLatestUpdatedAt returns null when no data', async () => {
const store = createStore({ organizationId: orgId, repositoryId })
expect(await store.getLatestUpdatedAt()).toBeNull()
expect(await store.getScanWatermark()).toBe('2024-01-05T00:00:00Z')
})

test('preloadAll uses lazy parsing (data accessible after preload)', async () => {
Expand Down
28 changes: 20 additions & 8 deletions batch/github/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,36 @@ export const createStore = ({
}

/**
* Get the latest updatedAt timestamp for this repository.
* Uses SQL MAX() — no JSON parsing needed.
* Scan watermark: the upper bound (PR updatedAt) up to which a full-sweep
* crawl has guaranteed to have fetched every PR. Only full-sweep crawls
* should advance this; targeted fetches (webhook-driven) must NOT touch it,
* otherwise older PRs that were updated during a gap get skipped by the
* next full crawl's `stopBefore` check. See issue #278.
*/
const getLatestUpdatedAt = async (): Promise<string | null> => {
const getScanWatermark = async (): Promise<string | null> => {
const row = await db
.selectFrom('githubRawData')
.select((eb) => eb.fn.max('updatedAt').as('maxUpdatedAt'))
.where('repositoryId', '=', repositoryId)
.selectFrom('repositories')
.select('scanWatermark')
.where('id', '=', repositoryId)
.executeTakeFirst()
return (row?.maxUpdatedAt as string | null) ?? null
return row?.scanWatermark ?? null
}

const setScanWatermark = async (watermark: string): Promise<void> => {
await db
.updateTable('repositories')
.set({ scanWatermark: watermark })
.where('id', '=', repositoryId)
.execute()
}

return {
savePrData,
updatePrMetadata,
saveTags,
preloadAll,
getLatestUpdatedAt,
getScanWatermark,
setScanWatermark,
loader: {
commits,
discussions,
Expand Down
10 changes: 10 additions & 0 deletions db/migrations/tenant/20260407004753_add_scan_watermark.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Add column "scan_watermark" to table: "repositories"
ALTER TABLE `repositories` ADD COLUMN `scan_watermark` text NULL;
-- Backfill scan_watermark from existing raw data so next crawl does not re-fetch everything.
-- Historical data was produced by full-sweep crawls, so max(updated_at) is a valid watermark.
UPDATE `repositories`
SET `scan_watermark` = (
SELECT MAX(`updated_at`)
FROM `github_raw_data`
WHERE `github_raw_data`.`repository_id` = `repositories`.`id`
);
3 changes: 2 additions & 1 deletion db/migrations/tenant/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:7qLuG3yxMP6dnU1l/Cux8QOBPDTV4aLP+s8tVFmg3XA=
h1:faIWXl0zqPfxS0ANZy6xQifugYWN4RCW8D31sx77twI=
20260226112249_initial_tenant.sql h1:dIhBg2gzyh+ZjLzPXdHYafd5e62yIEjk1eFlllEyYX0=
20260226233619_add_teams.sql h1:n8MRMUA4BgeXYEnL9HJPc8mnXh8lqIfrCcdYtFFoWqw=
20260227163239.sql h1:ENMZUW7zHK8UjG2TdYlBOZSVPPUCXftIw5U5k2C54oo=
Expand All @@ -21,3 +21,4 @@ h1:7qLuG3yxMP6dnU1l/Cux8QOBPDTV4aLP+s8tVFmg3XA=
20260318120000_migrate_excluded_users_to_bot_type.sql h1:QhLNJld3iqMSd/Zi8/jHadFB+0u4ocStzpprBqCaG+M=
20260319120000_add_last_activity_at.sql h1:2evW+gCdRyH1OvTH2NDh5cV2xaxOHIldHYrkwS2ThE4=
20260324000000_move_integrations_to_shared.sql h1:KsJftEmsruoNtxLi2katRAhETjXx2uTefqV3iX2pVsE=
20260407004753_add_scan_watermark.sql h1:ULts2U5BxtBoi5Qjb2+0o4SrJ3RBg7zcC47xKsMpk6o=
1 change: 1 addition & 0 deletions db/tenant.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ CREATE TABLE `repositories` (
`updated_at` datetime NOT NULL,
`created_at` datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP),
`team_id` text NULL,
`scan_watermark` text NULL,
PRIMARY KEY (`id`),
CONSTRAINT `repositories_team_id_fkey` FOREIGN KEY (`team_id`) REFERENCES `teams` (`id`) ON UPDATE CASCADE ON DELETE SET NULL
);
Expand Down
Loading