11import { defineJob } from '@coji/durably'
22import { z } from 'zod'
3+ import { getErrorMessageForLog } from '~/app/libs/error-message'
34import { clearOrgCache } from '~/app/services/cache.server'
45import {
56 assertOrgGithubAuthResolvable ,
@@ -9,7 +10,10 @@ import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.serv
910import { shouldTriggerFullOrgProcessJob } from '~/app/services/jobs/crawl-process-handoff.server'
1011import type { OrganizationId } from '~/app/types/organization'
1112import { getOrganization } from '~/batch/db/queries'
12- import { createFetcher } from '~/batch/github/fetcher'
13+ import {
14+ createFetcher ,
15+ GraphQLResourceMissingError ,
16+ } from '~/batch/github/fetcher'
1317import { createStore } from '~/batch/github/store'
1418import { computeAdvancedScanWatermark } from './scan-watermark'
1519
@@ -24,6 +28,9 @@ export const crawlJob = defineJob({
2428 output : z . object ( {
2529 fetchedRepos : z . number ( ) ,
2630 pullCount : z . number ( ) ,
31+ failedRepos : z
32+ . array ( z . object ( { repoLabel : z . string ( ) , error : z . string ( ) } ) )
33+ . default ( [ ] ) ,
2734 } ) ,
2835 run : async ( step , input ) => {
2936 const orgId = input . organizationId as OrganizationId
@@ -56,6 +63,7 @@ export const crawlJob = defineJob({
5663 } )
5764
5865 const updatedPrNumbers = new Map < string , Set < number > > ( )
66+ const failedRepos : Array < { repoLabel : string ; error : string } > = [ ]
5967
6068 const FETCH_ALL_SENTINEL = '2000-01-01T00:00:00Z'
6169
@@ -71,117 +79,152 @@ export const crawlJob = defineJob({
7179 const repo = targetRepos [ i ]
7280 const repoLabel = `${ repo . owner } /${ repo . repo } `
7381
74- const store = createStore ( {
75- organizationId : orgId ,
76- repositoryId : repo . id ,
77- } )
78- const fetcher = createFetcher ( {
79- owner : repo . owner ,
80- repo : repo . repo ,
81- octokit,
82- } )
83-
84- if ( repo . releaseDetectionMethod === 'tags' ) {
85- await step . run ( `fetch-tags:${ repoLabel } ` , async ( ) => {
86- step . progress ( i + 1 , repoCount , `Fetching tags: ${ repoLabel } ...` )
87- const allTags = await fetcher . tags ( )
88- await store . saveTags ( allTags )
89- return { tagCount : allTags . length }
82+ try {
83+ const store = createStore ( {
84+ organizationId : orgId ,
85+ repositoryId : repo . id ,
9086 } )
91- }
87+ const fetcher = createFetcher ( {
88+ owner : repo . owner ,
89+ repo : repo . repo ,
90+ octokit,
91+ } )
92+
93+ // GraphQLResourceMissingError は step.run の外で再 throw する。
94+ // step.run 内で raw に throw すると durably がリトライしてしまうため、
95+ // sentinel として返してから外で投げ直し、外側 try/catch で記録する。
96+ if ( repo . releaseDetectionMethod === 'tags' ) {
97+ const result = await step . run ( `fetch-tags:${ repoLabel } ` , async ( ) => {
98+ step . progress ( i + 1 , repoCount , `Fetching tags: ${ repoLabel } ...` )
99+ try {
100+ const allTags = await fetcher . tags ( )
101+ await store . saveTags ( allTags )
102+ return { tagCount : allTags . length , missingMessage : null }
103+ } catch ( e ) {
104+ if ( e instanceof GraphQLResourceMissingError ) {
105+ return { tagCount : 0 , missingMessage : e . message }
106+ }
107+ throw e
108+ }
109+ } )
110+ if ( result . missingMessage !== null ) {
111+ throw new GraphQLResourceMissingError ( result . missingMessage )
112+ }
113+ }
114+
115+ // Watermark bounds full-sweep progress; targeted fetches must not
116+ // advance it (see computeAdvancedScanWatermark / #278).
117+ const scanWatermark = input . refresh
118+ ? FETCH_ALL_SENTINEL
119+ : ( repo . scanWatermark ?? FETCH_ALL_SENTINEL )
92120
93- // Watermark bounds full-sweep progress; targeted fetches must not
94- // advance it (see computeAdvancedScanWatermark / #278). Already
95- // preloaded via getOrganization, so no extra round-trip here.
96- const scanWatermark = input . refresh
97- ? FETCH_ALL_SENTINEL
98- : ( repo . scanWatermark ?? FETCH_ALL_SENTINEL )
99-
100- const prNumberSet = input . prNumbers ? new Set ( input . prNumbers ) : null
101- const prsToFetch : Array < { number : number ; updatedAt ?: string } > =
102- prNumberSet
103- ? ( input . prNumbers ?. map ( ( n ) => ( { number : n } ) ) ?? [ ] )
104- : await step . run ( `fetch-prs:${ repoLabel } ` , async ( ) => {
121+ const prNumberSet = input . prNumbers ? new Set ( input . prNumbers ) : null
122+ let prsToFetch : Array < { number : number ; updatedAt ?: string } >
123+ if ( prNumberSet ) {
124+ prsToFetch = input . prNumbers ?. map ( ( n ) => ( { number : n } ) ) ?? [ ]
125+ } else {
126+ const result = await step . run ( `fetch-prs:${ repoLabel } ` , async ( ) => {
127+ step . progress ( i + 1 , repoCount , `Fetching PR list: ${ repoLabel } ...` )
128+ const stopBefore =
129+ input . refresh || scanWatermark === FETCH_ALL_SENTINEL
130+ ? undefined
131+ : scanWatermark
132+ try {
133+ return {
134+ prs : await fetcher . pullrequestList ( stopBefore ) ,
135+ missingMessage : null as string | null ,
136+ }
137+ } catch ( e ) {
138+ if ( e instanceof GraphQLResourceMissingError ) {
139+ return {
140+ prs : [ ] as Array < { number : number ; updatedAt : string } > ,
141+ missingMessage : e . message ,
142+ }
143+ }
144+ throw e
145+ }
146+ } )
147+ if ( result . missingMessage !== null ) {
148+ throw new GraphQLResourceMissingError ( result . missingMessage )
149+ }
150+ prsToFetch = result . prs
151+ }
152+
153+ const repoUpdated = new Set < number > ( )
154+ for ( let j = 0 ; j < prsToFetch . length ; j ++ ) {
155+ const pr = prsToFetch [ j ]
156+ const saved = await step . run (
157+ `fetch-pr:${ repoLabel } :#${ pr . number } ` ,
158+ async ( ) => {
105159 step . progress (
106- i + 1 ,
107- repoCount ,
108- `Fetching PR list: ${ repoLabel } ...` ,
160+ j + 1 ,
161+ prsToFetch . length ,
162+ `Fetching ${ repoLabel } # ${ pr . number } ( ${ j + 1 } / ${ prsToFetch . length } ) ...` ,
109163 )
110- const stopBefore =
111- input . refresh || scanWatermark === FETCH_ALL_SENTINEL
112- ? undefined
113- : scanWatermark
114- return await fetcher . pullrequestList ( stopBefore )
115- } )
116-
117- const repoUpdated = new Set < number > ( )
118- for ( let j = 0 ; j < prsToFetch . length ; j ++ ) {
119- const pr = prsToFetch [ j ]
120- const saved = await step . run (
121- `fetch-pr:${ repoLabel } :#${ pr . number } ` ,
122- async ( ) => {
123- step . progress (
124- j + 1 ,
125- prsToFetch . length ,
126- `Fetching ${ repoLabel } #${ pr . number } (${ j + 1 } /${ prsToFetch . length } )...` ,
127- )
128- const fetchedAt = new Date ( ) . toISOString ( )
129- try {
130- const [
131- prMetadata ,
132- commits ,
133- discussions ,
134- reviews ,
135- timelineItems ,
136- files ,
137- ] = await Promise . all ( [
138- fetcher . pullrequest ( pr . number ) ,
139- fetcher . commits ( pr . number ) ,
140- fetcher . comments ( pr . number ) ,
141- fetcher . reviews ( pr . number ) ,
142- fetcher . timelineItems ( pr . number ) ,
143- fetcher . files ( pr . number ) ,
144- ] )
145- const prForSave = { ...prMetadata , files }
146- await store . savePrData (
147- prForSave ,
148- {
164+ const fetchedAt = new Date ( ) . toISOString ( )
165+ try {
166+ const [
167+ prMetadata ,
149168 commits ,
150- reviews,
151169 discussions ,
170+ reviews ,
152171 timelineItems ,
153- } ,
154- fetchedAt ,
155- )
156- return { saved : true as const , number : pr . number }
157- } catch ( e ) {
158- step . log . warn (
159- `Failed to fetch ${ repoLabel } #${ pr . number } : ${ e instanceof Error ? e . message : e } ` ,
160- )
161- return { saved : false as const , number : pr . number }
162- }
163- } ,
164- )
165- if ( saved . saved ) {
166- repoUpdated . add ( saved . number )
172+ files ,
173+ ] = await Promise . all ( [
174+ fetcher . pullrequest ( pr . number ) ,
175+ fetcher . commits ( pr . number ) ,
176+ fetcher . comments ( pr . number ) ,
177+ fetcher . reviews ( pr . number ) ,
178+ fetcher . timelineItems ( pr . number ) ,
179+ fetcher . files ( pr . number ) ,
180+ ] )
181+ const prForSave = { ...prMetadata , files }
182+ await store . savePrData (
183+ prForSave ,
184+ {
185+ commits,
186+ reviews,
187+ discussions,
188+ timelineItems,
189+ } ,
190+ fetchedAt ,
191+ )
192+ return { saved : true as const , number : pr . number }
193+ } catch ( e ) {
194+ step . log . warn (
195+ `Failed to fetch ${ repoLabel } #${ pr . number } : ${ getErrorMessageForLog ( e ) } ` ,
196+ )
197+ return { saved : false as const , number : pr . number }
198+ }
199+ } ,
200+ )
201+ if ( saved . saved ) {
202+ repoUpdated . add ( saved . number )
203+ }
167204 }
168- }
169205
170- if ( repoUpdated . size > 0 ) {
171- updatedPrNumbers . set ( repo . id , repoUpdated )
172- }
206+ if ( repoUpdated . size > 0 ) {
207+ updatedPrNumbers . set ( repo . id , repoUpdated )
208+ }
173209
174- // Advance the scan watermark only after a fully successful full-sweep.
175- // See computeAdvancedScanWatermark for the invariants.
176- const nextWatermark = computeAdvancedScanWatermark ( {
177- isTargetedFetch : prNumberSet !== null ,
178- prsToFetch,
179- savedPrNumbers : repoUpdated ,
180- } )
181- if ( nextWatermark !== null ) {
182- await step . run ( `advance-scan-watermark:${ repoLabel } ` , async ( ) => {
183- await store . setScanWatermark ( nextWatermark )
210+ // Advance the scan watermark only after a fully successful full-sweep.
211+ // See computeAdvancedScanWatermark for the invariants.
212+ const nextWatermark = computeAdvancedScanWatermark ( {
213+ isTargetedFetch : prNumberSet !== null ,
214+ prsToFetch,
215+ savedPrNumbers : repoUpdated ,
184216 } )
217+ if ( nextWatermark !== null ) {
218+ await step . run ( `advance-scan-watermark:${ repoLabel } ` , async ( ) => {
219+ await store . setScanWatermark ( nextWatermark )
220+ } )
221+ }
222+ } catch ( e ) {
223+ const message = getErrorMessageForLog ( e )
224+ step . log . error (
225+ `Failed to crawl ${ repoLabel } , skipping remaining steps for this repo: ${ message } ` ,
226+ )
227+ failedRepos . push ( { repoLabel, error : message } )
185228 }
186229 }
187230
@@ -190,7 +233,7 @@ export const crawlJob = defineJob({
190233 await step . run ( 'finalize' , ( ) => {
191234 clearOrgCache ( orgId )
192235 } )
193- return { fetchedRepos : repoCount , pullCount : 0 }
236+ return { fetchedRepos : repoCount , pullCount : 0 , failedRepos }
194237 }
195238
196239 if ( input . prNumbers && updatedPrNumbers . size === 0 ) {
@@ -236,6 +279,6 @@ export const crawlJob = defineJob({
236279 }
237280 } )
238281
239- return { fetchedRepos : repoCount , pullCount }
282+ return { fetchedRepos : repoCount , pullCount, failedRepos }
240283 } ,
241284} )
0 commit comments