Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 146 additions & 103 deletions app/services/jobs/crawl.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { defineJob } from '@coji/durably'
import { z } from 'zod'
import { getErrorMessageForLog } from '~/app/libs/error-message'
import { clearOrgCache } from '~/app/services/cache.server'
import {
assertOrgGithubAuthResolvable,
Expand All @@ -9,7 +10,10 @@ import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.serv
import { shouldTriggerFullOrgProcessJob } from '~/app/services/jobs/crawl-process-handoff.server'
import type { OrganizationId } from '~/app/types/organization'
import { getOrganization } from '~/batch/db/queries'
import { createFetcher } from '~/batch/github/fetcher'
import {
createFetcher,
GraphQLResourceMissingError,
} from '~/batch/github/fetcher'
import { createStore } from '~/batch/github/store'
import { computeAdvancedScanWatermark } from './scan-watermark'

Expand All @@ -24,6 +28,9 @@ export const crawlJob = defineJob({
output: z.object({
fetchedRepos: z.number(),
pullCount: z.number(),
failedRepos: z
.array(z.object({ repoLabel: z.string(), error: z.string() }))
.default([]),
}),
run: async (step, input) => {
const orgId = input.organizationId as OrganizationId
Expand Down Expand Up @@ -56,6 +63,7 @@ export const crawlJob = defineJob({
})

const updatedPrNumbers = new Map<string, Set<number>>()
const failedRepos: Array<{ repoLabel: string; error: string }> = []

const FETCH_ALL_SENTINEL = '2000-01-01T00:00:00Z'

Expand All @@ -71,117 +79,152 @@ export const crawlJob = defineJob({
const repo = targetRepos[i]
const repoLabel = `${repo.owner}/${repo.repo}`

const store = createStore({
organizationId: orgId,
repositoryId: repo.id,
})
const fetcher = createFetcher({
owner: repo.owner,
repo: repo.repo,
octokit,
})

if (repo.releaseDetectionMethod === 'tags') {
await step.run(`fetch-tags:${repoLabel}`, async () => {
step.progress(i + 1, repoCount, `Fetching tags: ${repoLabel}...`)
const allTags = await fetcher.tags()
await store.saveTags(allTags)
return { tagCount: allTags.length }
try {
const store = createStore({
organizationId: orgId,
repositoryId: repo.id,
})
}
const fetcher = createFetcher({
owner: repo.owner,
repo: repo.repo,
octokit,
})

// GraphQLResourceMissingError は step.run の外で再 throw する。
// step.run 内で raw に throw すると durably がリトライしてしまうため、
// sentinel として返してから外で投げ直し、外側 try/catch で記録する。
if (repo.releaseDetectionMethod === 'tags') {
const result = await step.run(`fetch-tags:${repoLabel}`, async () => {
step.progress(i + 1, repoCount, `Fetching tags: ${repoLabel}...`)
try {
const allTags = await fetcher.tags()
await store.saveTags(allTags)
return { tagCount: allTags.length, missingMessage: null }
} catch (e) {
if (e instanceof GraphQLResourceMissingError) {
return { tagCount: 0, missingMessage: e.message }
}
throw e
}
})
if (result.missingMessage !== null) {
throw new GraphQLResourceMissingError(result.missingMessage)
}
}

// Watermark bounds full-sweep progress; targeted fetches must not
// advance it (see computeAdvancedScanWatermark / #278).
const scanWatermark = input.refresh
? FETCH_ALL_SENTINEL
: (repo.scanWatermark ?? FETCH_ALL_SENTINEL)

// Watermark bounds full-sweep progress; targeted fetches must not
// advance it (see computeAdvancedScanWatermark / #278). Already
// preloaded via getOrganization, so no extra round-trip here.
const scanWatermark = input.refresh
? FETCH_ALL_SENTINEL
: (repo.scanWatermark ?? FETCH_ALL_SENTINEL)

const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null
const prsToFetch: Array<{ number: number; updatedAt?: string }> =
prNumberSet
? (input.prNumbers?.map((n) => ({ number: n })) ?? [])
: await step.run(`fetch-prs:${repoLabel}`, async () => {
const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null
let prsToFetch: Array<{ number: number; updatedAt?: string }>
if (prNumberSet) {
prsToFetch = input.prNumbers?.map((n) => ({ number: n })) ?? []
} else {
const result = await step.run(`fetch-prs:${repoLabel}`, async () => {
step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`)
const stopBefore =
input.refresh || scanWatermark === FETCH_ALL_SENTINEL
? undefined
: scanWatermark
try {
return {
prs: await fetcher.pullrequestList(stopBefore),
missingMessage: null as string | null,
}
} catch (e) {
if (e instanceof GraphQLResourceMissingError) {
return {
prs: [] as Array<{ number: number; updatedAt: string }>,
missingMessage: e.message,
}
}
throw e
}
})
if (result.missingMessage !== null) {
throw new GraphQLResourceMissingError(result.missingMessage)
}
prsToFetch = result.prs
}

const repoUpdated = new Set<number>()
for (let j = 0; j < prsToFetch.length; j++) {
const pr = prsToFetch[j]
const saved = await step.run(
`fetch-pr:${repoLabel}:#${pr.number}`,
async () => {
step.progress(
i + 1,
repoCount,
`Fetching PR list: ${repoLabel}...`,
j + 1,
prsToFetch.length,
`Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`,
)
const stopBefore =
input.refresh || scanWatermark === FETCH_ALL_SENTINEL
? undefined
: scanWatermark
return await fetcher.pullrequestList(stopBefore)
})

const repoUpdated = new Set<number>()
for (let j = 0; j < prsToFetch.length; j++) {
const pr = prsToFetch[j]
const saved = await step.run(
`fetch-pr:${repoLabel}:#${pr.number}`,
async () => {
step.progress(
j + 1,
prsToFetch.length,
`Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`,
)
const fetchedAt = new Date().toISOString()
try {
const [
prMetadata,
commits,
discussions,
reviews,
timelineItems,
files,
] = await Promise.all([
fetcher.pullrequest(pr.number),
fetcher.commits(pr.number),
fetcher.comments(pr.number),
fetcher.reviews(pr.number),
fetcher.timelineItems(pr.number),
fetcher.files(pr.number),
])
const prForSave = { ...prMetadata, files }
await store.savePrData(
prForSave,
{
const fetchedAt = new Date().toISOString()
try {
const [
prMetadata,
commits,
reviews,
discussions,
reviews,
timelineItems,
},
fetchedAt,
)
return { saved: true as const, number: pr.number }
} catch (e) {
step.log.warn(
`Failed to fetch ${repoLabel}#${pr.number}: ${e instanceof Error ? e.message : e}`,
)
return { saved: false as const, number: pr.number }
}
},
)
if (saved.saved) {
repoUpdated.add(saved.number)
files,
] = await Promise.all([
fetcher.pullrequest(pr.number),
fetcher.commits(pr.number),
fetcher.comments(pr.number),
fetcher.reviews(pr.number),
fetcher.timelineItems(pr.number),
fetcher.files(pr.number),
])
const prForSave = { ...prMetadata, files }
await store.savePrData(
prForSave,
{
commits,
reviews,
discussions,
timelineItems,
},
fetchedAt,
)
return { saved: true as const, number: pr.number }
} catch (e) {
step.log.warn(
`Failed to fetch ${repoLabel}#${pr.number}: ${getErrorMessageForLog(e)}`,
)
return { saved: false as const, number: pr.number }
}
},
)
if (saved.saved) {
repoUpdated.add(saved.number)
}
}
}

if (repoUpdated.size > 0) {
updatedPrNumbers.set(repo.id, repoUpdated)
}
if (repoUpdated.size > 0) {
updatedPrNumbers.set(repo.id, repoUpdated)
}

// Advance the scan watermark only after a fully successful full-sweep.
// See computeAdvancedScanWatermark for the invariants.
const nextWatermark = computeAdvancedScanWatermark({
isTargetedFetch: prNumberSet !== null,
prsToFetch,
savedPrNumbers: repoUpdated,
})
if (nextWatermark !== null) {
await step.run(`advance-scan-watermark:${repoLabel}`, async () => {
await store.setScanWatermark(nextWatermark)
// Advance the scan watermark only after a fully successful full-sweep.
// See computeAdvancedScanWatermark for the invariants.
const nextWatermark = computeAdvancedScanWatermark({
isTargetedFetch: prNumberSet !== null,
prsToFetch,
savedPrNumbers: repoUpdated,
})
if (nextWatermark !== null) {
await step.run(`advance-scan-watermark:${repoLabel}`, async () => {
await store.setScanWatermark(nextWatermark)
})
}
} catch (e) {
const message = getErrorMessageForLog(e)
step.log.error(
`Failed to crawl ${repoLabel}, skipping remaining steps for this repo: ${message}`,
)
failedRepos.push({ repoLabel, error: message })
}
}

Expand All @@ -190,7 +233,7 @@ export const crawlJob = defineJob({
await step.run('finalize', () => {
clearOrgCache(orgId)
})
return { fetchedRepos: repoCount, pullCount: 0 }
return { fetchedRepos: repoCount, pullCount: 0, failedRepos }
}

if (input.prNumbers && updatedPrNumbers.size === 0) {
Expand Down Expand Up @@ -236,6 +279,6 @@ export const crawlJob = defineJob({
}
})

return { fetchedRepos: repoCount, pullCount }
return { fetchedRepos: repoCount, pullCount, failedRepos }
},
})
7 changes: 7 additions & 0 deletions batch/commands/crawl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ export async function crawlCommand({
consola.success(
`Crawl completed. ${output.fetchedRepos} repos, ${output.pullCount} PRs fetched.`,
)
if (output.failedRepos.length > 0) {
consola.warn(
`Failed to crawl ${output.failedRepos.length} repo(s):\n${output.failedRepos
.map((f) => ` - ${f.repoLabel}: ${f.error}`)
.join('\n')}`,
)
}
} finally {
await durably.stop()
await shutdown()
Expand Down
41 changes: 39 additions & 2 deletions batch/github/fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ import { describe, expect, test, vi } from 'vitest'
import type { ShapedTimelineItem } from './model'

// 純粋関数なので直接 import してテスト
const { buildRequestedAtMap, createFetcher, paginateGraphQL, shapeTagNode } =
await import('./fetcher')
const {
buildRequestedAtMap,
createFetcher,
GraphQLResourceMissingError,
paginateGraphQL,
shapeTagNode,
} = await import('./fetcher')

describe('buildRequestedAtMap', () => {
test('returns empty map for empty items', () => {
Expand Down Expand Up @@ -325,6 +330,38 @@ describe('paginateGraphQL shouldStop', () => {
])
})

type RepoResult = {
repository: {
nodes: unknown[]
pageInfo: { hasNextPage: boolean; endCursor: string | null }
} | null
}
const extractRepoConnection = (r: RepoResult) => r.repository
const isRepoMissing = (r: RepoResult) => r?.repository == null

test('throws GraphQLResourceMissingError when isResourceMissing returns true', async () => {
const graphqlFn = vi.fn(() => Promise.resolve({ repository: null }))
await expect(
paginateGraphQL(graphqlFn, extractRepoConnection, (n: unknown) => n, {
minPageSize: 10,
label: 'test(owner/repo)',
isResourceMissing: isRepoMissing,
}),
).rejects.toBeInstanceOf(GraphQLResourceMissingError)
// ページサイズを縮小しながらリトライせず、1回呼んだだけで throw
expect(graphqlFn).toHaveBeenCalledTimes(1)
})

test('throws GraphQLResourceMissingError including label/cursor', async () => {
const graphqlFn = () => Promise.resolve({ repository: null })
await expect(
paginateGraphQL(graphqlFn, extractRepoConnection, (n: unknown) => n, {
label: 'pullrequestList(acme/ghost)',
isResourceMissing: isRepoMissing,
}),
).rejects.toThrow(/pullrequestList\(acme\/ghost\)/)
})

test('returns all nodes when shouldStop is not provided', async () => {
const pages: Node[][] = [
[
Expand Down
Loading
Loading