diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 24c5de95..f186cf43 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -1,5 +1,6 @@ import { defineJob } from '@coji/durably' import { z } from 'zod' +import { getErrorMessageForLog } from '~/app/libs/error-message' import { clearOrgCache } from '~/app/services/cache.server' import { assertOrgGithubAuthResolvable, @@ -9,7 +10,10 @@ import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.serv import { shouldTriggerFullOrgProcessJob } from '~/app/services/jobs/crawl-process-handoff.server' import type { OrganizationId } from '~/app/types/organization' import { getOrganization } from '~/batch/db/queries' -import { createFetcher } from '~/batch/github/fetcher' +import { + createFetcher, + GraphQLResourceMissingError, +} from '~/batch/github/fetcher' import { createStore } from '~/batch/github/store' import { computeAdvancedScanWatermark } from './scan-watermark' @@ -24,6 +28,9 @@ export const crawlJob = defineJob({ output: z.object({ fetchedRepos: z.number(), pullCount: z.number(), + failedRepos: z + .array(z.object({ repoLabel: z.string(), error: z.string() })) + .default([]), }), run: async (step, input) => { const orgId = input.organizationId as OrganizationId @@ -56,6 +63,7 @@ export const crawlJob = defineJob({ }) const updatedPrNumbers = new Map>() + const failedRepos: Array<{ repoLabel: string; error: string }> = [] const FETCH_ALL_SENTINEL = '2000-01-01T00:00:00Z' @@ -71,117 +79,152 @@ export const crawlJob = defineJob({ const repo = targetRepos[i] const repoLabel = `${repo.owner}/${repo.repo}` - const store = createStore({ - organizationId: orgId, - repositoryId: repo.id, - }) - const fetcher = createFetcher({ - owner: repo.owner, - repo: repo.repo, - octokit, - }) - - if (repo.releaseDetectionMethod === 'tags') { - await step.run(`fetch-tags:${repoLabel}`, async () => { - step.progress(i + 1, repoCount, `Fetching tags: ${repoLabel}...`) - const allTags = await fetcher.tags() - await store.saveTags(allTags) - return { tagCount: allTags.length } + try { + const store = createStore({ + organizationId: orgId, + repositoryId: repo.id, }) - } + const fetcher = createFetcher({ + owner: repo.owner, + repo: repo.repo, + octokit, + }) + + // GraphQLResourceMissingError は step.run の外で再 throw する。 + // step.run 内で raw に throw すると durably がリトライしてしまうため、 + // sentinel として返してから外で投げ直し、外側 try/catch で記録する。 + if (repo.releaseDetectionMethod === 'tags') { + const result = await step.run(`fetch-tags:${repoLabel}`, async () => { + step.progress(i + 1, repoCount, `Fetching tags: ${repoLabel}...`) + try { + const allTags = await fetcher.tags() + await store.saveTags(allTags) + return { tagCount: allTags.length, missingMessage: null } + } catch (e) { + if (e instanceof GraphQLResourceMissingError) { + return { tagCount: 0, missingMessage: e.message } + } + throw e + } + }) + if (result.missingMessage !== null) { + throw new GraphQLResourceMissingError(result.missingMessage) + } + } + + // Watermark bounds full-sweep progress; targeted fetches must not + // advance it (see computeAdvancedScanWatermark / #278). + const scanWatermark = input.refresh + ? FETCH_ALL_SENTINEL + : (repo.scanWatermark ?? FETCH_ALL_SENTINEL) - // Watermark bounds full-sweep progress; targeted fetches must not - // advance it (see computeAdvancedScanWatermark / #278). Already - // preloaded via getOrganization, so no extra round-trip here. - const scanWatermark = input.refresh - ? FETCH_ALL_SENTINEL - : (repo.scanWatermark ?? FETCH_ALL_SENTINEL) - - const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null - const prsToFetch: Array<{ number: number; updatedAt?: string }> = - prNumberSet - ? (input.prNumbers?.map((n) => ({ number: n })) ?? []) - : await step.run(`fetch-prs:${repoLabel}`, async () => { + const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null + let prsToFetch: Array<{ number: number; updatedAt?: string }> + if (prNumberSet) { + prsToFetch = input.prNumbers?.map((n) => ({ number: n })) ?? [] + } else { + const result = await step.run(`fetch-prs:${repoLabel}`, async () => { + step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`) + const stopBefore = + input.refresh || scanWatermark === FETCH_ALL_SENTINEL + ? undefined + : scanWatermark + try { + return { + prs: await fetcher.pullrequestList(stopBefore), + missingMessage: null as string | null, + } + } catch (e) { + if (e instanceof GraphQLResourceMissingError) { + return { + prs: [] as Array<{ number: number; updatedAt: string }>, + missingMessage: e.message, + } + } + throw e + } + }) + if (result.missingMessage !== null) { + throw new GraphQLResourceMissingError(result.missingMessage) + } + prsToFetch = result.prs + } + + const repoUpdated = new Set() + for (let j = 0; j < prsToFetch.length; j++) { + const pr = prsToFetch[j] + const saved = await step.run( + `fetch-pr:${repoLabel}:#${pr.number}`, + async () => { step.progress( - i + 1, - repoCount, - `Fetching PR list: ${repoLabel}...`, + j + 1, + prsToFetch.length, + `Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`, ) - const stopBefore = - input.refresh || scanWatermark === FETCH_ALL_SENTINEL - ? undefined - : scanWatermark - return await fetcher.pullrequestList(stopBefore) - }) - - const repoUpdated = new Set() - for (let j = 0; j < prsToFetch.length; j++) { - const pr = prsToFetch[j] - const saved = await step.run( - `fetch-pr:${repoLabel}:#${pr.number}`, - async () => { - step.progress( - j + 1, - prsToFetch.length, - `Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`, - ) - const fetchedAt = new Date().toISOString() - try { - const [ - prMetadata, - commits, - discussions, - reviews, - timelineItems, - files, - ] = await Promise.all([ - fetcher.pullrequest(pr.number), - fetcher.commits(pr.number), - fetcher.comments(pr.number), - fetcher.reviews(pr.number), - fetcher.timelineItems(pr.number), - fetcher.files(pr.number), - ]) - const prForSave = { ...prMetadata, files } - await store.savePrData( - prForSave, - { + const fetchedAt = new Date().toISOString() + try { + const [ + prMetadata, commits, - reviews, discussions, + reviews, timelineItems, - }, - fetchedAt, - ) - return { saved: true as const, number: pr.number } - } catch (e) { - step.log.warn( - `Failed to fetch ${repoLabel}#${pr.number}: ${e instanceof Error ? e.message : e}`, - ) - return { saved: false as const, number: pr.number } - } - }, - ) - if (saved.saved) { - repoUpdated.add(saved.number) + files, + ] = await Promise.all([ + fetcher.pullrequest(pr.number), + fetcher.commits(pr.number), + fetcher.comments(pr.number), + fetcher.reviews(pr.number), + fetcher.timelineItems(pr.number), + fetcher.files(pr.number), + ]) + const prForSave = { ...prMetadata, files } + await store.savePrData( + prForSave, + { + commits, + reviews, + discussions, + timelineItems, + }, + fetchedAt, + ) + return { saved: true as const, number: pr.number } + } catch (e) { + step.log.warn( + `Failed to fetch ${repoLabel}#${pr.number}: ${getErrorMessageForLog(e)}`, + ) + return { saved: false as const, number: pr.number } + } + }, + ) + if (saved.saved) { + repoUpdated.add(saved.number) + } } - } - if (repoUpdated.size > 0) { - updatedPrNumbers.set(repo.id, repoUpdated) - } + if (repoUpdated.size > 0) { + updatedPrNumbers.set(repo.id, repoUpdated) + } - // Advance the scan watermark only after a fully successful full-sweep. - // See computeAdvancedScanWatermark for the invariants. - const nextWatermark = computeAdvancedScanWatermark({ - isTargetedFetch: prNumberSet !== null, - prsToFetch, - savedPrNumbers: repoUpdated, - }) - if (nextWatermark !== null) { - await step.run(`advance-scan-watermark:${repoLabel}`, async () => { - await store.setScanWatermark(nextWatermark) + // Advance the scan watermark only after a fully successful full-sweep. + // See computeAdvancedScanWatermark for the invariants. + const nextWatermark = computeAdvancedScanWatermark({ + isTargetedFetch: prNumberSet !== null, + prsToFetch, + savedPrNumbers: repoUpdated, }) + if (nextWatermark !== null) { + await step.run(`advance-scan-watermark:${repoLabel}`, async () => { + await store.setScanWatermark(nextWatermark) + }) + } + } catch (e) { + const message = getErrorMessageForLog(e) + step.log.error( + `Failed to crawl ${repoLabel}, skipping remaining steps for this repo: ${message}`, + ) + failedRepos.push({ repoLabel, error: message }) } } @@ -190,7 +233,7 @@ export const crawlJob = defineJob({ await step.run('finalize', () => { clearOrgCache(orgId) }) - return { fetchedRepos: repoCount, pullCount: 0 } + return { fetchedRepos: repoCount, pullCount: 0, failedRepos } } if (input.prNumbers && updatedPrNumbers.size === 0) { @@ -236,6 +279,6 @@ export const crawlJob = defineJob({ } }) - return { fetchedRepos: repoCount, pullCount } + return { fetchedRepos: repoCount, pullCount, failedRepos } }, }) diff --git a/batch/commands/crawl.ts b/batch/commands/crawl.ts index d47ded64..15af50d5 100644 --- a/batch/commands/crawl.ts +++ b/batch/commands/crawl.ts @@ -84,6 +84,13 @@ export async function crawlCommand({ consola.success( `Crawl completed. ${output.fetchedRepos} repos, ${output.pullCount} PRs fetched.`, ) + if (output.failedRepos.length > 0) { + consola.warn( + `Failed to crawl ${output.failedRepos.length} repo(s):\n${output.failedRepos + .map((f) => ` - ${f.repoLabel}: ${f.error}`) + .join('\n')}`, + ) + } } finally { await durably.stop() await shutdown() diff --git a/batch/github/fetcher.test.ts b/batch/github/fetcher.test.ts index d092e7bb..e791a0b9 100644 --- a/batch/github/fetcher.test.ts +++ b/batch/github/fetcher.test.ts @@ -3,8 +3,13 @@ import { describe, expect, test, vi } from 'vitest' import type { ShapedTimelineItem } from './model' // 純粋関数なので直接 import してテスト -const { buildRequestedAtMap, createFetcher, paginateGraphQL, shapeTagNode } = - await import('./fetcher') +const { + buildRequestedAtMap, + createFetcher, + GraphQLResourceMissingError, + paginateGraphQL, + shapeTagNode, +} = await import('./fetcher') describe('buildRequestedAtMap', () => { test('returns empty map for empty items', () => { @@ -325,6 +330,38 @@ describe('paginateGraphQL shouldStop', () => { ]) }) + type RepoResult = { + repository: { + nodes: unknown[] + pageInfo: { hasNextPage: boolean; endCursor: string | null } + } | null + } + const extractRepoConnection = (r: RepoResult) => r.repository + const isRepoMissing = (r: RepoResult) => r?.repository == null + + test('throws GraphQLResourceMissingError when isResourceMissing returns true', async () => { + const graphqlFn = vi.fn(() => Promise.resolve({ repository: null })) + await expect( + paginateGraphQL(graphqlFn, extractRepoConnection, (n: unknown) => n, { + minPageSize: 10, + label: 'test(owner/repo)', + isResourceMissing: isRepoMissing, + }), + ).rejects.toBeInstanceOf(GraphQLResourceMissingError) + // ページサイズを縮小しながらリトライせず、1回呼んだだけで throw + expect(graphqlFn).toHaveBeenCalledTimes(1) + }) + + test('throws GraphQLResourceMissingError including label/cursor', async () => { + const graphqlFn = () => Promise.resolve({ repository: null }) + await expect( + paginateGraphQL(graphqlFn, extractRepoConnection, (n: unknown) => n, { + label: 'pullrequestList(acme/ghost)', + isResourceMissing: isRepoMissing, + }), + ).rejects.toThrow(/pullrequestList\(acme\/ghost\)/) + }) + test('returns all nodes when shouldStop is not provided', async () => { const pages: Node[][] = [ [ diff --git a/batch/github/fetcher.ts b/batch/github/fetcher.ts index 86c5f8d3..f0cefe35 100644 --- a/batch/github/fetcher.ts +++ b/batch/github/fetcher.ts @@ -745,7 +745,25 @@ interface PageInfo { endCursor?: string | null } -interface PaginateOptions { +/** + * GraphQL のトップレベルリソース(例: `repository`)が null の場合に投げられるエラー。 + * ページサイズを下げてもリソース自体が存在しない問題は解決しないため、 + * paginateGraphQL は即座にこの例外を投げてリトライループを打ち切る。 + */ +export class GraphQLResourceMissingError extends Error { + constructor(message: string) { + super(message) + this.name = 'GraphQLResourceMissingError' + } +} + +const repositoryMissing = (r: unknown): boolean => + r != null && + typeof r === 'object' && + 'repository' in r && + (r as { repository: unknown }).repository == null + +interface PaginateOptions { /** ページネーション用の初期ページサイズ(デフォルト: 100) */ initialPageSize?: number /** handleGraphQLError の最小ページサイズ。指定するとエラー時にページサイズ削減+リトライする */ @@ -754,6 +772,12 @@ interface PaginateOptions { label?: string /** ノードに対する早期打ち切り判定。true を返すとそのノードでページング停止 */ shouldStop?: (node: TNode) => boolean + /** + * リソース(例: repository)が存在しないことを判定する。 + * true を返すと GraphQLResourceMissingError を throw してループを即座に打ち切る。 + * nodes が null でもリソース自体が null なのか空ページなのかを区別できる。 + */ + isResourceMissing?: (result: TResult) => boolean } /** @@ -767,13 +791,14 @@ export async function paginateGraphQL( result: TResult, ) => { nodes: (TNode | null)[] | null; pageInfo: PageInfo } | null, processNode: (node: TNode) => TItem | null, - options: PaginateOptions = {}, + options: PaginateOptions = {}, ): Promise { const { initialPageSize = 100, minPageSize, label = 'paginateGraphQL', shouldStop, + isResourceMissing, } = options const items: TItem[] = [] let cursor: string | null = null @@ -803,6 +828,12 @@ export async function paginateGraphQL( result = await graphqlFn(variables) } + if (isResourceMissing?.(result)) { + throw new GraphQLResourceMissingError( + `${label}: resource not found (cursor: ${cursor})`, + ) + } + const connection = extractConnection(result) const nodes = connection?.nodes if (!nodes) { @@ -1136,7 +1167,11 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { (vars) => graphqlWithTimeout(queryStr, { owner, repo, ...vars }), (r) => r?.repository?.pullRequests ?? null, (node) => shapePullRequestNode(node, owner, repo), - { minPageSize: 10, label: 'pullrequests()' }, + { + minPageSize: 10, + label: `pullrequests(${owner}/${repo})`, + isResourceMissing: repositoryMissing, + }, ) } @@ -1154,7 +1189,8 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { (node) => ({ number: node.number, updatedAt: node.updatedAt }), { minPageSize: 10, - label: 'pullrequestList()', + label: `pullrequestList(${owner}/${repo})`, + isResourceMissing: repositoryMissing, // ISO 8601 UTC 文字列同士なので lexicographic 比較 = 時系列比較 shouldStop: stopBefore ? (node) => node.updatedAt <= stopBefore @@ -1175,6 +1211,11 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { number: pullNumber, }) + if (repositoryMissing(result)) { + throw new GraphQLResourceMissingError( + `pullrequest(${owner}/${repo}): repository not found`, + ) + } const node = result?.repository?.pullRequest ?? null const shaped = shapePullRequestNode(node, owner, repo) if (!shaped) { @@ -1282,6 +1323,10 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { (vars) => graphqlWithTimeout(queryStr, { owner, repo, ...vars }), (r) => r.repository?.refs ?? null, shapeTagNode, + { + label: `tags(${owner}/${repo})`, + isResourceMissing: repositoryMissing, + }, ) } @@ -1387,7 +1432,8 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { { initialPageSize: 25, minPageSize: 5, - label: 'pullrequestsWithDetails()', + label: `pullrequestsWithDetails(${owner}/${repo})`, + isResourceMissing: repositoryMissing, }, ) }