From 17c6c3437cbaa399bc937727a8de2f284ae0d546 Mon Sep 17 00:00:00 2001 From: coji Date: Tue, 31 Mar 2026 21:21:08 +0900 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20Webhook=20=E3=81=AB=E3=82=88?= =?UTF-8?q?=E3=82=8B=E3=83=AA=E3=82=A2=E3=83=AB=E3=82=BF=E3=82=A4=E3=83=A0?= =?UTF-8?q?=20PR=20=E6=9B=B4=E6=96=B0=20(#255)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - fetchedAt ガードで古いデータによる raw 上書きを防止 - crawl ジョブを fetch 専任にリファクタリング - 新規 process ジョブで analyze/upsert/export/classify を統合 - recalculate ジョブを廃止し process に統合 - webhook handler を拡張し PR イベントで fetch + process を trigger - coalesce: 'skip' で trigger 圧縮(N 回の webhook → 最大 2 run) - process ジョブは concurrencyKey で org 単位直列化 Co-Authored-By: Claude Opus 4.6 (1M context) --- .prettierignore | 1 + .../+components/job-history.tsx | 2 +- .../settings/data-management/index.tsx | 34 +-- .../repositories/$repository/$pull/index.tsx | 95 ++----- app/routes/api.github.webhook.test.ts | 50 +++- app/routes/api.github.webhook.ts | 3 - app/services/durably.server.ts | 4 +- .../github-webhook-installation.server.ts | 175 +++++++++++++ app/services/github-webhook-pull.server.ts | 78 ++++++ app/services/github-webhook-shared.server.ts | 35 +++ app/services/github-webhook.server.test.ts | 115 ++++++++- app/services/github-webhook.server.ts | 222 ++-------------- .../jobs/crawl-process-handoff.server.test.ts | 39 +++ .../jobs/crawl-process-handoff.server.ts | 11 + app/services/jobs/crawl.server.ts | 110 +++++--- app/services/jobs/process.server.ts | 81 ++++++ app/services/jobs/recalculate.server.ts | 45 ---- app/services/jobs/shared-steps.server.ts | 2 +- batch/cli.ts | 29 ++- batch/commands/backfill.ts | 2 +- batch/commands/crawl.ts | 47 +++- batch/commands/{recalculate.ts => process.ts} | 14 +- batch/db/mutations.ts | 2 +- batch/github/backfill-repo.ts | 11 +- batch/github/store.test.ts | 240 +++++++++++++----- batch/github/store.ts | 30 ++- 26 files changed, 989 insertions(+), 488 deletions(-) create mode 100644 app/services/github-webhook-installation.server.ts create mode 100644 app/services/github-webhook-pull.server.ts create mode 100644 app/services/github-webhook-shared.server.ts create mode 100644 app/services/jobs/crawl-process-handoff.server.test.ts create mode 100644 app/services/jobs/crawl-process-handoff.server.ts create mode 100644 app/services/jobs/process.server.ts delete mode 100644 app/services/jobs/recalculate.server.ts rename batch/commands/{recalculate.ts => process.ts} (71%) diff --git a/.prettierignore b/.prettierignore index c4cafe02..ed12e0fc 100644 --- a/.prettierignore +++ b/.prettierignore @@ -1,3 +1,4 @@ +.claude/ pnpm-lock.yaml app/services/type.ts app/services/tenant-type.ts diff --git a/app/routes/$orgSlug/settings/data-management/+components/job-history.tsx b/app/routes/$orgSlug/settings/data-management/+components/job-history.tsx index 0f6b37dc..6d0ce83c 100644 --- a/app/routes/$orgSlug/settings/data-management/+components/job-history.tsx +++ b/app/routes/$orgSlug/settings/data-management/+components/job-history.tsx @@ -20,7 +20,7 @@ export function isRunActive(status: RunStatus): boolean { const jobNameColors: Record = { crawl: 'bg-blue-100 text-blue-800', - recalculate: 'bg-purple-100 text-purple-800', + process: 'bg-purple-100 text-purple-800', classify: 'bg-amber-100 text-amber-800', backfill: 'bg-emerald-100 text-emerald-800', } diff --git a/app/routes/$orgSlug/settings/data-management/index.tsx b/app/routes/$orgSlug/settings/data-management/index.tsx index a398b3c7..ae9a9a91 100644 --- a/app/routes/$orgSlug/settings/data-management/index.tsx +++ b/app/routes/$orgSlug/settings/data-management/index.tsx @@ -53,7 +53,7 @@ export const action = async ({ request, context }: Route.ActionArgs) => { ) } }) - .with('recalculate', async () => { + .with('process', async () => { const selectedSteps = formData.getAll('steps').map(String) const steps = { upsert: selectedSteps.includes('upsert'), @@ -63,7 +63,7 @@ export const action = async ({ request, context }: Route.ActionArgs) => { if (!steps.upsert && !steps.export) { return data( { - intent: 'recalculate' as const, + intent: 'process' as const, error: 'At least one step must be selected', }, { status: 400 }, @@ -71,19 +71,19 @@ export const action = async ({ request, context }: Route.ActionArgs) => { } try { - await serverDurably.jobs.recalculate.trigger( + await serverDurably.jobs.process.trigger( { organizationId: org.id, steps }, { - concurrencyKey: `recalculate:${org.id}`, + concurrencyKey: `process:${org.id}`, labels: { organizationId: org.id }, }, ) - return data({ intent: 'recalculate' as const, ok: true }) + return data({ intent: 'process' as const, ok: true }) } catch { return data( { - intent: 'recalculate' as const, - error: 'Failed to start recalculation', + intent: 'process' as const, + error: 'Failed to start process job', }, { status: 500 }, ) @@ -126,28 +126,28 @@ function RefreshSection({ isRunning }: { isRunning: boolean }) { ) } -// --- Recalculate Section --- +// --- Process Section --- -function RecalculateSection({ isRunning }: { isRunning: boolean }) { +function ProcessSection({ isRunning }: { isRunning: boolean }) { const fetcher = useFetcher() const [upsert, setUpsert] = useState(true) const [exportData, setExportData] = useState(false) const noneSelected = !upsert && !exportData const isSubmitting = fetcher.state !== 'idle' const triggerError = - fetcher.data?.intent === 'recalculate' ? fetcher.data?.error : null + fetcher.data?.intent === 'process' ? fetcher.data?.error : null return (
-

Recalculate Cycle Times

+

Process Cycle Times

Re-analyze PR data from stored raw data. Select which steps to run.

- +
- Recalculate + Process
@@ -271,18 +271,18 @@ export default function DataManagementPage({ const isCrawlRunning = runs.some( (r) => r.jobName === 'crawl' && isRunActive(r.status), ) - const isRecalculateRunning = runs.some( - (r) => r.jobName === 'recalculate' && isRunActive(r.status), + const isProcessRunning = runs.some( + (r) => r.jobName === 'process' && isRunActive(r.status), ) return ( - + { - // 1. Get existing PR shape from raw data - const shapedPr = await getShapedPullRequest( - organization.id, - repositoryId, - pullId, - ) - if (!shapedPr) { - throw new Response('Raw PR data not found. Run Compare first.', { - status: 404, - }) - } - const pr = shapedPr as unknown as ShapedGitHubPullRequest - - // 2. Re-fetch commits/comments/reviews/timelineItems from GitHub - const [commits, comments, reviews, timelineItems] = await Promise.all([ - fetcher.commits(pullId), - fetcher.comments(pullId), - fetcher.reviews(pullId), - fetcher.timelineItems(pullId), - ]) + const fetchedAt = new Date().toISOString() + const [prMetadata, commits, comments, reviews, timelineItems, files] = + await Promise.all([ + fetcher.pullrequest(pullId), + fetcher.commits(pullId), + fetcher.comments(pullId), + fetcher.reviews(pullId), + fetcher.timelineItems(pullId), + fetcher.files(pullId), + ]) + const prForSave = { ...prMetadata, files } - // 3. Save raw data via store const store = createStore({ organizationId: organization.id, repositoryId, }) - await store.savePrData(pr, { - commits, - reviews, - discussions: comments, - timelineItems, - }) - - // 4. Get organization settings and bot logins for build config - const [settings, botLoginsList] = await Promise.all([ - getOrganizationSettings(organization.id), - getBotLogins(organization.id), - ]) + await store.savePrData( + prForSave, + { + commits, + reviews, + discussions: comments, + timelineItems, + }, + fetchedAt, + ) - // 5. Build pull request data (analyze) - const result = await buildPullRequests( + const { durably } = await import('~/app/services/durably.server') + await durably.jobs.process.triggerAndWait( { organizationId: organization.id, - repositoryId, - botLogins: new Set(botLoginsList), - releaseDetectionMethod: settings?.releaseDetectionMethod ?? 'branch', - releaseDetectionKey: settings?.releaseDetectionKey ?? '', + scopes: [{ repositoryId, prNumbers: [pullId] }], + }, + { + concurrencyKey: `process:${organization.id}`, + labels: { organizationId: organization.id }, }, - [pr], - store.loader, ) - // 6. Upsert to DB - for (const pull of result.pulls) { - await upsertPullRequest(organization.id, pull) - } - for (const review of result.reviews) { - await upsertPullRequestReview(organization.id, review) - } - for (const reviewer of result.reviewers) { - await upsertPullRequestReviewers( - organization.id, - reviewer.repositoryId, - reviewer.pullRequestNumber, - reviewer.reviewers, - ) - } - return { intent: 'refresh' as const, success: true } }) .exhaustive() diff --git a/app/routes/api.github.webhook.test.ts b/app/routes/api.github.webhook.test.ts index 5c77e441..ffeb080c 100644 --- a/app/routes/api.github.webhook.test.ts +++ b/app/routes/api.github.webhook.test.ts @@ -59,11 +59,11 @@ describe('api.github.webhook', () => { expect(process).not.toHaveBeenCalled() }) - test('202 for unhandled event without calling processor', async () => { + test('204 for ping and delegates to processor (no-op in service)', async () => { verify.mockReturnValue(true) const res = await post('{}', { 'X-GitHub-Event': 'ping' }) - expect(res.status).toBe(202) - expect(process).not.toHaveBeenCalled() + expect(res.status).toBe(204) + expect(process).toHaveBeenCalledWith('ping', {}) }) test('204 for installation and calls processor', async () => { @@ -81,4 +81,48 @@ describe('api.github.webhook', () => { const res = await post('{}', { 'X-GitHub-Event': 'installation' }) expect(res.status).toBe(500) }) + + const prPayload = { + action: 'opened', + installation: { id: 1 }, + repository: { name: 'r', owner: { login: 'o' } }, + pull_request: { number: 2 }, + } + + test('204 for pull_request and delegates to processor', async () => { + verify.mockReturnValue(true) + const res = await post(JSON.stringify(prPayload), { + 'X-GitHub-Event': 'pull_request', + }) + expect(res.status).toBe(204) + expect(process).toHaveBeenCalledWith('pull_request', prPayload) + }) + + test('204 for pull_request_review and delegates to processor', async () => { + verify.mockReturnValue(true) + const res = await post(JSON.stringify(prPayload), { + 'X-GitHub-Event': 'pull_request_review', + }) + expect(res.status).toBe(204) + expect(process).toHaveBeenCalledWith('pull_request_review', prPayload) + }) + + test('204 for pull_request_review_comment and delegates to processor', async () => { + verify.mockReturnValue(true) + const res = await post(JSON.stringify(prPayload), { + 'X-GitHub-Event': 'pull_request_review_comment', + }) + expect(res.status).toBe(204) + expect(process).toHaveBeenCalledWith( + 'pull_request_review_comment', + prPayload, + ) + }) + + test('204 for unsupported event still delegates to processor (service no-op)', async () => { + verify.mockReturnValue(true) + const res = await post('{}', { 'X-GitHub-Event': 'issues' }) + expect(res.status).toBe(204) + expect(process).toHaveBeenCalledWith('issues', {}) + }) }) diff --git a/app/routes/api.github.webhook.ts b/app/routes/api.github.webhook.ts index 42a85c5a..8aeeeb1b 100644 --- a/app/routes/api.github.webhook.ts +++ b/app/routes/api.github.webhook.ts @@ -25,9 +25,6 @@ export const action = async ({ request }: Route.ActionArgs) => { } const event = request.headers.get('X-GitHub-Event') - if (event !== 'installation' && event !== 'installation_repositories') { - return new Response(null, { status: 202 }) - } try { await processGithubWebhookPayload(event, payload) diff --git a/app/services/durably.server.ts b/app/services/durably.server.ts index a3d84b7d..f6b1ec03 100644 --- a/app/services/durably.server.ts +++ b/app/services/durably.server.ts @@ -6,7 +6,7 @@ import { registerDurablySentryListeners } from '~/app/libs/sentry-node.server' import { backfillJob } from '~/app/services/jobs/backfill.server' import { classifyJob } from '~/app/services/jobs/classify.server' import { crawlJob } from '~/app/services/jobs/crawl.server' -import { recalculateJob } from '~/app/services/jobs/recalculate.server' +import { processJob } from '~/app/services/jobs/process.server' function createDurablyInstance() { const database = new SQLite('./data/durably.db') @@ -23,7 +23,7 @@ function createDurablyInstance() { backfill: backfillJob, classify: classifyJob, crawl: crawlJob, - recalculate: recalculateJob, + process: processJob, }, }) } diff --git a/app/services/github-webhook-installation.server.ts b/app/services/github-webhook-installation.server.ts new file mode 100644 index 00000000..073f56f1 --- /dev/null +++ b/app/services/github-webhook-installation.server.ts @@ -0,0 +1,175 @@ +import createDebug from 'debug' +import type { Kysely } from 'kysely' +import { db, type DB } from '~/app/services/db.server' +import { + type InstallationLike, + readInstallation, + selectionFromInstallation, +} from '~/app/services/github-webhook-shared.server' + +const debug = createDebug('app:github-webhook:installation') + +async function findActiveLinkByInstallation( + trx: Kysely, + installationId: number, +) { + return await trx + .selectFrom('githubAppLinks') + .selectAll() + .where('installationId', '=', installationId) + .where('deletedAt', 'is', null) + .executeTakeFirst() +} + +async function findActiveLinkByInstallationOrAccount( + trx: Kysely, + installationId: number, + githubAccountId: number, +) { + return await trx + .selectFrom('githubAppLinks') + .selectAll() + .where('deletedAt', 'is', null) + .where((eb) => + eb.or([ + eb('installationId', '=', installationId), + eb('githubAccountId', '=', githubAccountId), + ]), + ) + .executeTakeFirst() +} + +async function handleInstallationCreated( + trx: Kysely, + installation: InstallationLike, +): Promise { + const accountId = installation.account?.id + if (accountId === undefined) return null + + const link = await findActiveLinkByInstallationOrAccount( + trx, + installation.id, + accountId, + ) + if (!link) { + debug( + 'installation.created: no github_app_links row for installation_id=%s account_id=%s', + installation.id, + accountId, + ) + return null + } + + const login = installation.account?.login ?? link.githubOrg + const now = new Date().toISOString() + await trx + .updateTable('githubAppLinks') + .set({ + installationId: installation.id, + githubAccountId: accountId, + githubOrg: login, + appRepositorySelection: selectionFromInstallation(installation), + updatedAt: now, + }) + .where('organizationId', '=', link.organizationId) + .execute() + + return link.organizationId +} + +async function handleInstallationDeleted( + trx: Kysely, + installation: InstallationLike, +): Promise { + const link = await findActiveLinkByInstallation(trx, installation.id) + if (!link) return null + + const now = new Date().toISOString() + await trx + .updateTable('githubAppLinks') + .set({ deletedAt: now, updatedAt: now }) + .where('organizationId', '=', link.organizationId) + .execute() + + return link.organizationId +} + +async function handleInstallationSuspend( + trx: Kysely, + installation: InstallationLike, + suspend: boolean, +): Promise { + const link = await findActiveLinkByInstallation(trx, installation.id) + if (!link) return null + + const now = new Date().toISOString() + await trx + .updateTable('integrations') + .set({ + appSuspendedAt: suspend ? now : null, + updatedAt: now, + }) + .where('organizationId', '=', link.organizationId) + .execute() + + return link.organizationId +} + +async function handleInstallationRepositories( + trx: Kysely, + payload: Record, +): Promise { + const installation = readInstallation(payload) + if (!installation) return null + + const link = await findActiveLinkByInstallation(trx, installation.id) + if (!link) return null + + const now = new Date().toISOString() + await trx + .updateTable('githubAppLinks') + .set({ + appRepositorySelection: selectionFromInstallation(installation), + updatedAt: now, + }) + .where('organizationId', '=', link.organizationId) + .execute() + + return link.organizationId +} + +async function handleInstallationEvent( + trx: Kysely, + payload: Record, +): Promise { + const action = payload.action + if (typeof action !== 'string') return null + + const installation = readInstallation(payload) + if (!installation) return null + + switch (action) { + case 'created': + return await handleInstallationCreated(trx, installation) + case 'deleted': + return await handleInstallationDeleted(trx, installation) + case 'suspend': + return await handleInstallationSuspend(trx, installation, true) + case 'unsuspend': + return await handleInstallationSuspend(trx, installation, false) + default: + return null + } +} + +export async function runInstallationWebhookInTransaction( + event: 'installation' | 'installation_repositories', + payload: Record, +): Promise { + return await db.transaction().execute(async (trx) => { + if (event === 'installation') { + return await handleInstallationEvent(trx, payload) + } + return await handleInstallationRepositories(trx, payload) + }) +} diff --git a/app/services/github-webhook-pull.server.ts b/app/services/github-webhook-pull.server.ts new file mode 100644 index 00000000..3cef976e --- /dev/null +++ b/app/services/github-webhook-pull.server.ts @@ -0,0 +1,78 @@ +import { db } from '~/app/services/db.server' +import { + isRecord, + readInstallation, +} from '~/app/services/github-webhook-shared.server' +import { getTenantDb } from '~/app/services/tenant-db.server' +import type { OrganizationId } from '~/app/types/organization' + +function extractPullRequestNumber( + payload: Record, +): number | null { + const pr = payload.pull_request + if (!isRecord(pr) || typeof pr.number !== 'number') return null + return pr.number +} + +function readRepositoryOwnerName( + payload: Record, +): { owner: string; name: string } | null { + const repo = payload.repository + if (!isRecord(repo)) return null + const name = typeof repo.name === 'string' ? repo.name : null + const ownerObj = repo.owner + const owner = + isRecord(ownerObj) && typeof ownerObj.login === 'string' + ? ownerObj.login + : null + if (!name || !owner) return null + return { owner, name } +} + +export async function handlePullWebhookEvent( + _event: string, + payload: Record, +): Promise { + const installation = readInstallation(payload) + if (!installation) return + + const link = await db + .selectFrom('githubAppLinks') + .select('organizationId') + .where('installationId', '=', installation.id) + .where('deletedAt', 'is', null) + .executeTakeFirst() + if (!link) return + + const orgId = link.organizationId as OrganizationId + + const coords = readRepositoryOwnerName(payload) + if (!coords) return + + const prNumber = extractPullRequestNumber(payload) + if (prNumber === null) return + + const tenantDb = getTenantDb(orgId) + const repo = await tenantDb + .selectFrom('repositories') + .select('id') + .where('owner', '=', coords.owner) + .where('repo', '=', coords.name) + .executeTakeFirst() + if (!repo) return + + const { durably } = await import('~/app/services/durably.server') + await durably.jobs.crawl.trigger( + { + organizationId: orgId, + refresh: false, + repositoryId: repo.id, + prNumbers: [prNumber], + }, + { + concurrencyKey: `crawl:${orgId}`, + labels: { organizationId: orgId }, + coalesce: 'skip', + }, + ) +} diff --git a/app/services/github-webhook-shared.server.ts b/app/services/github-webhook-shared.server.ts new file mode 100644 index 00000000..a87ad54a --- /dev/null +++ b/app/services/github-webhook-shared.server.ts @@ -0,0 +1,35 @@ +export function isRecord(x: unknown): x is Record { + return typeof x === 'object' && x !== null && !Array.isArray(x) +} + +export type InstallationLike = { + id: number + account?: { id: number; login?: string } + repository_selection?: string +} + +export function readInstallation( + payload: Record, +): InstallationLike | null { + const inst = payload.installation + if (!isRecord(inst) || typeof inst.id !== 'number') return null + const acc = inst.account + let account: { id: number; login?: string } | undefined + if (isRecord(acc) && typeof acc.id === 'number') { + account = { + id: acc.id, + login: typeof acc.login === 'string' ? acc.login : undefined, + } + } + const repository_selection = + typeof inst.repository_selection === 'string' + ? inst.repository_selection + : undefined + return { id: inst.id, account, repository_selection } +} + +export function selectionFromInstallation( + installation: InstallationLike, +): 'all' | 'selected' { + return installation.repository_selection === 'selected' ? 'selected' : 'all' +} diff --git a/app/services/github-webhook.server.test.ts b/app/services/github-webhook.server.test.ts index 4192d570..de37b11d 100644 --- a/app/services/github-webhook.server.test.ts +++ b/app/services/github-webhook.server.test.ts @@ -15,6 +15,32 @@ import * as cache from '~/app/services/cache.server' import { closeDb, db } from '~/app/services/db.server' import { processGithubWebhookPayload } from './github-webhook.server' +const mockTenantRepoLookup = vi.fn() +vi.mock('~/app/services/tenant-db.server', () => ({ + getTenantDb: vi.fn(() => ({ + selectFrom: () => ({ + select: () => ({ + where: () => ({ + where: () => ({ + executeTakeFirst: () => mockTenantRepoLookup(), + }), + }), + }), + }), + })), +})) + +const crawlTriggerMock = vi.fn() +vi.mock('~/app/services/durably.server', () => ({ + durably: { + jobs: { + crawl: { + trigger: (...args: unknown[]) => crawlTriggerMock(...args), + }, + }, + }, +})) + const testDir = path.join(tmpdir(), `github-webhook-${Date.now()}`) mkdirSync(testDir, { recursive: true }) const testDbPath = path.join(testDir, 'data.db') @@ -69,7 +95,6 @@ describe('processGithubWebhookPayload', () => { afterAll(async () => { await closeDb() - vi.unstubAllEnvs() }) afterEach(() => { @@ -228,3 +253,91 @@ describe('processGithubWebhookPayload', () => { expect(clearSpy).not.toHaveBeenCalled() }) }) + +describe('PR webhook enqueue', () => { + afterAll(() => { + vi.unstubAllEnvs() + }) + + beforeEach(() => { + vi.stubEnv('UPFLOW_DATA_DIR', path.dirname(testDbPath)) + crawlTriggerMock.mockReset() + mockTenantRepoLookup.mockReset() + mockTenantRepoLookup.mockResolvedValue({ id: 'tracked-repo' }) + }) + + test('pull_request enqueues crawl with repository id and PR number', async () => { + await processGithubWebhookPayload('pull_request', { + action: 'opened', + installation: { id: 42 }, + repository: { name: 'test-repo', owner: { login: 'test-owner' } }, + pull_request: { number: 7 }, + }) + + expect(crawlTriggerMock).toHaveBeenCalledWith( + { + organizationId: 'o1', + refresh: false, + repositoryId: 'tracked-repo', + prNumbers: [7], + }, + expect.objectContaining({ + concurrencyKey: 'crawl:o1', + coalesce: 'skip', + labels: { organizationId: 'o1' }, + }), + ) + }) + + test('pull_request_review uses pull_request.number', async () => { + await processGithubWebhookPayload('pull_request_review', { + action: 'submitted', + installation: { id: 42 }, + repository: { name: 'test-repo', owner: { login: 'test-owner' } }, + pull_request: { number: 12 }, + }) + + expect(crawlTriggerMock).toHaveBeenCalledWith( + expect.objectContaining({ prNumbers: [12] }), + expect.anything(), + ) + }) + + test('pull_request_review_comment enqueues crawl', async () => { + await processGithubWebhookPayload('pull_request_review_comment', { + action: 'created', + installation: { id: 42 }, + repository: { name: 'test-repo', owner: { login: 'test-owner' } }, + pull_request: { number: 3 }, + }) + + expect(crawlTriggerMock).toHaveBeenCalledWith( + expect.objectContaining({ prNumbers: [3] }), + expect.anything(), + ) + }) + + test('unknown installation does not enqueue', async () => { + await processGithubWebhookPayload('pull_request', { + action: 'opened', + installation: { id: 99999 }, + repository: { name: 'test-repo', owner: { login: 'test-owner' } }, + pull_request: { number: 1 }, + }) + + expect(crawlTriggerMock).not.toHaveBeenCalled() + }) + + test('untracked repository does not enqueue', async () => { + mockTenantRepoLookup.mockResolvedValue(undefined) + + await processGithubWebhookPayload('pull_request', { + action: 'opened', + installation: { id: 42 }, + repository: { name: 'other-repo', owner: { login: 'test-owner' } }, + pull_request: { number: 1 }, + }) + + expect(crawlTriggerMock).not.toHaveBeenCalled() + }) +}) diff --git a/app/services/github-webhook.server.ts b/app/services/github-webhook.server.ts index 2afb8ab5..b8b8a5c6 100644 --- a/app/services/github-webhook.server.ts +++ b/app/services/github-webhook.server.ts @@ -1,201 +1,10 @@ -import createDebug from 'debug' -import type { Kysely } from 'kysely' import { clearOrgCache } from '~/app/services/cache.server' -import { db, type DB } from '~/app/services/db.server' - -const debug = createDebug('app:github-webhook') - -function isRecord(x: unknown): x is Record { - return typeof x === 'object' && x !== null && !Array.isArray(x) -} - -type InstallationLike = { - id: number - account?: { id: number; login?: string } - repository_selection?: string -} - -function readInstallation( - payload: Record, -): InstallationLike | null { - const inst = payload.installation - if (!isRecord(inst) || typeof inst.id !== 'number') return null - const acc = inst.account - let account: { id: number; login?: string } | undefined - if (isRecord(acc) && typeof acc.id === 'number') { - account = { - id: acc.id, - login: typeof acc.login === 'string' ? acc.login : undefined, - } - } - const repository_selection = - typeof inst.repository_selection === 'string' - ? inst.repository_selection - : undefined - return { id: inst.id, account, repository_selection } -} - -function selectionFromInstallation( - installation: InstallationLike, -): 'all' | 'selected' { - return installation.repository_selection === 'selected' ? 'selected' : 'all' -} - -async function findActiveLinkByInstallation( - trx: Kysely, - installationId: number, -) { - return await trx - .selectFrom('githubAppLinks') - .selectAll() - .where('installationId', '=', installationId) - .where('deletedAt', 'is', null) - .executeTakeFirst() -} - -async function findActiveLinkByInstallationOrAccount( - trx: Kysely, - installationId: number, - githubAccountId: number, -) { - return await trx - .selectFrom('githubAppLinks') - .selectAll() - .where('deletedAt', 'is', null) - .where((eb) => - eb.or([ - eb('installationId', '=', installationId), - eb('githubAccountId', '=', githubAccountId), - ]), - ) - .executeTakeFirst() -} - -async function handleInstallationCreated( - trx: Kysely, - installation: InstallationLike, -): Promise { - const accountId = installation.account?.id - if (accountId === undefined) return null - - const link = await findActiveLinkByInstallationOrAccount( - trx, - installation.id, - accountId, - ) - if (!link) { - debug( - 'installation.created: no github_app_links row for installation_id=%s account_id=%s', - installation.id, - accountId, - ) - return null - } - - const login = installation.account?.login ?? link.githubOrg - const now = new Date().toISOString() - await trx - .updateTable('githubAppLinks') - .set({ - installationId: installation.id, - githubAccountId: accountId, - githubOrg: login, - appRepositorySelection: selectionFromInstallation(installation), - updatedAt: now, - }) - .where('organizationId', '=', link.organizationId) - .execute() - - return link.organizationId -} - -async function handleInstallationDeleted( - trx: Kysely, - installation: InstallationLike, -): Promise { - const link = await findActiveLinkByInstallation(trx, installation.id) - if (!link) return null - - const now = new Date().toISOString() - await trx - .updateTable('githubAppLinks') - .set({ deletedAt: now, updatedAt: now }) - .where('organizationId', '=', link.organizationId) - .execute() - - return link.organizationId -} - -async function handleInstallationSuspend( - trx: Kysely, - installation: InstallationLike, - suspend: boolean, -): Promise { - const link = await findActiveLinkByInstallation(trx, installation.id) - if (!link) return null - - const now = new Date().toISOString() - await trx - .updateTable('integrations') - .set({ - appSuspendedAt: suspend ? now : null, - updatedAt: now, - }) - .where('organizationId', '=', link.organizationId) - .execute() - - return link.organizationId -} - -async function handleInstallationRepositories( - trx: Kysely, - payload: Record, -): Promise { - const installation = readInstallation(payload) - if (!installation) return null - - const link = await findActiveLinkByInstallation(trx, installation.id) - if (!link) return null - - const now = new Date().toISOString() - await trx - .updateTable('githubAppLinks') - .set({ - appRepositorySelection: selectionFromInstallation(installation), - updatedAt: now, - }) - .where('organizationId', '=', link.organizationId) - .execute() - - return link.organizationId -} - -async function handleInstallationEvent( - trx: Kysely, - payload: Record, -): Promise { - const action = payload.action - if (typeof action !== 'string') return null - - const installation = readInstallation(payload) - if (!installation) return null - - switch (action) { - case 'created': - return await handleInstallationCreated(trx, installation) - case 'deleted': - return await handleInstallationDeleted(trx, installation) - case 'suspend': - return await handleInstallationSuspend(trx, installation, true) - case 'unsuspend': - return await handleInstallationSuspend(trx, installation, false) - default: - return null - } -} +import { runInstallationWebhookInTransaction } from '~/app/services/github-webhook-installation.server' +import { handlePullWebhookEvent } from '~/app/services/github-webhook-pull.server' +import { isRecord } from '~/app/services/github-webhook-shared.server' /** - * Runs shared-DB updates for supported GitHub webhook events (inside one transaction). + * Shared-DB updates (installation) and PR webhook enqueue. * Call after signature verification and JSON parse. Unknown events are no-ops. */ export async function processGithubWebhookPayload( @@ -204,16 +13,17 @@ export async function processGithubWebhookPayload( ): Promise { if (!event || !isRecord(payload)) return - if (event !== 'installation' && event !== 'installation_repositories') return - - let orgToClear: string | null = null - await db.transaction().execute(async (trx) => { - if (event === 'installation') { - orgToClear = await handleInstallationEvent(trx, payload) - } else { - orgToClear = await handleInstallationRepositories(trx, payload) - } - }) + if (event === 'installation' || event === 'installation_repositories') { + const orgToClear = await runInstallationWebhookInTransaction(event, payload) + if (orgToClear) clearOrgCache(orgToClear) + return + } - if (orgToClear) clearOrgCache(orgToClear) + if ( + event === 'pull_request' || + event === 'pull_request_review' || + event === 'pull_request_review_comment' + ) { + await handlePullWebhookEvent(event, payload) + } } diff --git a/app/services/jobs/crawl-process-handoff.server.test.ts b/app/services/jobs/crawl-process-handoff.server.test.ts new file mode 100644 index 00000000..966f6f54 --- /dev/null +++ b/app/services/jobs/crawl-process-handoff.server.test.ts @@ -0,0 +1,39 @@ +import { describe, expect, test } from 'vitest' +import { shouldTriggerFullOrgProcessJob } from './crawl-process-handoff.server' + +describe('shouldTriggerFullOrgProcessJob', () => { + test('repository-scoped refresh does not widen to full-org process', () => { + expect( + shouldTriggerFullOrgProcessJob({ + refresh: true, + repositoryId: 'repo-uuid', + }), + ).toBe(false) + }) + + test('org-wide refresh without repository or PR targeting uses full-org process', () => { + expect( + shouldTriggerFullOrgProcessJob({ + refresh: true, + }), + ).toBe(true) + }) + + test('targeted PR refresh is not full-org', () => { + expect( + shouldTriggerFullOrgProcessJob({ + refresh: true, + repositoryId: 'r1', + prNumbers: [42], + }), + ).toBe(false) + }) + + test('incremental crawl without refresh is never full-org process branch', () => { + expect( + shouldTriggerFullOrgProcessJob({ + refresh: false, + }), + ).toBe(false) + }) +}) diff --git a/app/services/jobs/crawl-process-handoff.server.ts b/app/services/jobs/crawl-process-handoff.server.ts new file mode 100644 index 00000000..8589b347 --- /dev/null +++ b/app/services/jobs/crawl-process-handoff.server.ts @@ -0,0 +1,11 @@ +export function shouldTriggerFullOrgProcessJob(input: { + refresh: boolean + repositoryId?: string + prNumbers?: number[] +}): boolean { + return ( + input.refresh && + !input.repositoryId && + (!input.prNumbers || input.prNumbers.length === 0) + ) +} diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 6c625df7..2f74c835 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -5,14 +5,11 @@ import { assertOrgGithubAuthResolvable, resolveOctokitFromOrg, } from '~/app/services/github-octokit.server' +import { shouldTriggerFullOrgProcessJob } from '~/app/services/jobs/crawl-process-handoff.server' import type { OrganizationId } from '~/app/types/organization' import { getOrganization } from '~/batch/db/queries' import { createFetcher } from '~/batch/github/fetcher' import { createStore } from '~/batch/github/store' -import { - analyzeAndFinalizeSteps, - triggerClassifyStep, -} from './shared-steps.server' export const crawlJob = defineJob({ name: 'crawl', @@ -20,7 +17,7 @@ export const crawlJob = defineJob({ organizationId: z.string(), refresh: z.boolean().default(false), prNumbers: z.array(z.number()).optional(), - repoName: z.string().optional(), + repositoryId: z.string().optional(), }), output: z.object({ fetchedRepos: z.number(), @@ -29,10 +26,12 @@ export const crawlJob = defineJob({ run: async (step, input) => { const orgId = input.organizationId as OrganizationId - // Fetch org once before step (secrets stay outside step output) + if (input.prNumbers?.length && !input.repositoryId) { + throw new Error('repositoryId is required when prNumbers is set') + } + const fullOrg = await getOrganization(orgId) - // Step 1: Validate and extract serializable org data (no secrets in step output) const organization = await step.run('load-organization', () => { if (!fullOrg.organizationSetting) { throw new Error('No organization setting configured') @@ -58,10 +57,12 @@ export const crawlJob = defineJob({ const FETCH_ALL_SENTINEL = '2000-01-01T00:00:00Z' - // Step 2: Fetch per repo - const targetRepos = input.repoName - ? organization.repositories.filter((r) => r.repo === input.repoName) + const targetRepos = input.repositoryId + ? organization.repositories.filter((r) => r.id === input.repositoryId) : organization.repositories + if (input.repositoryId && targetRepos.length === 0) { + throw new Error('repositoryId does not match any organization repository') + } const repoCount = targetRepos.length for (let i = 0; i < targetRepos.length; i++) { @@ -78,7 +79,6 @@ export const crawlJob = defineJob({ octokit, }) - // Step 2a: Fetch tags (if tag-based release detection) if (repo.releaseDetectionMethod === 'tags') { await step.run(`fetch-tags:${repoLabel}`, async () => { step.progress(i + 1, repoCount, `Fetching tags: ${repoLabel}...`) @@ -88,7 +88,6 @@ export const crawlJob = defineJob({ }) } - // Step 2b: Determine lastFetchedAt (before list fetch for early termination) const lastFetchedAt = await step.run( `last-fetched-at:${repoLabel}`, async () => { @@ -100,11 +99,9 @@ export const crawlJob = defineJob({ }, ) - // Step 2c: Fetch lightweight PR list (number + updatedAt only) const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null const prsToFetch: Array<{ number: number }> = prNumberSet - ? // --pr 指定時: リスト取得をスキップし、指定番号だけ処理する - (input.prNumbers?.map((n) => ({ number: n })) ?? []) + ? (input.prNumbers?.map((n) => ({ number: n })) ?? []) : await step.run(`fetch-prs:${repoLabel}`, async () => { step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`) const stopBefore = @@ -114,7 +111,6 @@ export const crawlJob = defineJob({ return await fetcher.pullrequestList(stopBefore) }) - // Step 2d: Fetch details per PR (including PR metadata) const repoUpdated = new Set() for (let j = 0; j < prsToFetch.length; j++) { const pr = prsToFetch[j] @@ -126,6 +122,7 @@ export const crawlJob = defineJob({ prsToFetch.length, `Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`, ) + const fetchedAt = new Date().toISOString() try { const [ prMetadata, @@ -142,13 +139,17 @@ export const crawlJob = defineJob({ fetcher.timelineItems(pr.number), fetcher.files(pr.number), ]) - prMetadata.files = files - await store.savePrData(prMetadata, { - commits, - reviews, - discussions, - timelineItems, - }) + const prForSave = { ...prMetadata, files } + await store.savePrData( + prForSave, + { + commits, + reviews, + discussions, + timelineItems, + }, + fetchedAt, + ) return { saved: true as const, number: pr.number } } catch (e) { step.log.warn( @@ -168,9 +169,8 @@ export const crawlJob = defineJob({ } } - // Skip analyze if no updates (and not a refresh or specific PR fetch) if (!input.refresh && !input.prNumbers && updatedPrNumbers.size === 0) { - step.log.info('No updated PRs, skipping analyze.') + step.log.info('No updated PRs, skipping process.') await step.run('finalize', () => { clearOrgCache(orgId) }) @@ -183,23 +183,49 @@ export const crawlJob = defineJob({ ) } - // Steps 3-6: Analyze → Upsert → Export → Finalize - const { pullCount } = await analyzeAndFinalizeSteps( - step, - orgId, - organization, - { - filterPrNumbers: - input.refresh && !input.prNumbers ? undefined : updatedPrNumbers, - skipRepo: - input.refresh && !input.prNumbers - ? undefined - : (repoId) => !updatedPrNumbers.has(repoId), - }, - ) - - // Trigger classify job (fire-and-forget) - await triggerClassifyStep(step, orgId) + let pullCount = 0 + for (const set of updatedPrNumbers.values()) { + pullCount += set.size + } + + await step.run('trigger-process', async () => { + const { durably } = await import('~/app/services/durably.server') + const processOpts = { + concurrencyKey: `process:${orgId}`, + labels: { organizationId: orgId }, + coalesce: 'skip' as const, + } + if ( + shouldTriggerFullOrgProcessJob({ + refresh: input.refresh, + repositoryId: input.repositoryId, + prNumbers: input.prNumbers, + }) + ) { + await durably.jobs.process.trigger( + { organizationId: orgId }, + processOpts, + ) + } else { + const scopes = [...updatedPrNumbers.entries()].map( + ([repositoryId, set]) => ({ + repositoryId, + prNumbers: [...set], + }), + ) + if (scopes.length === 0) { + step.log.info( + 'No updated PRs for scoped process, skipping process job.', + ) + clearOrgCache(orgId) + return + } + await durably.jobs.process.trigger( + { organizationId: orgId, scopes }, + processOpts, + ) + } + }) return { fetchedRepos: repoCount, pullCount } }, diff --git a/app/services/jobs/process.server.ts b/app/services/jobs/process.server.ts new file mode 100644 index 00000000..6a2590a0 --- /dev/null +++ b/app/services/jobs/process.server.ts @@ -0,0 +1,81 @@ +import { defineJob } from '@coji/durably' +import { z } from 'zod' +import type { OrganizationId } from '~/app/types/organization' +import { getOrganization } from '~/batch/db/queries' +import { + analyzeAndFinalizeSteps, + triggerClassifyStep, +} from './shared-steps.server' + +const processScopeSchema = z.object({ + repositoryId: z.string(), + prNumbers: z.array(z.number()), +}) + +export const processJob = defineJob({ + name: 'process', + input: z.object({ + organizationId: z.string(), + steps: z + .object({ + upsert: z.boolean(), + export: z.boolean(), + }) + .optional(), + scopes: z.array(processScopeSchema).optional(), + }), + output: z.object({ + pullCount: z.number(), + }), + run: async (step, input) => { + const orgId = input.organizationId as OrganizationId + + if (input.scopes !== undefined && input.scopes.length === 0) { + return { pullCount: 0 } + } + + const organization = await step.run('load-organization', async () => { + step.progress(0, 0, 'Loading organization...') + const org = await getOrganization(orgId) + if (!org.organizationSetting) { + throw new Error('No organization setting configured') + } + return { + organizationSetting: org.organizationSetting, + botLogins: org.botLogins, + repositories: org.repositories, + exportSetting: org.exportSetting, + } + }) + + const scopes = input.scopes + const filterPrNumbers = + scopes && scopes.length > 0 + ? new Map( + scopes.map((s) => [s.repositoryId, new Set(s.prNumbers)] as const), + ) + : undefined + const allowedRepoIds = + scopes && scopes.length > 0 + ? new Set(scopes.map((s) => s.repositoryId)) + : undefined + const skipRepo = allowedRepoIds + ? (repoId: string) => !allowedRepoIds.has(repoId) + : undefined + + const { pullCount } = await analyzeAndFinalizeSteps( + step, + orgId, + organization, + { + filterPrNumbers, + skipRepo, + steps: input.steps, + }, + ) + + await triggerClassifyStep(step, orgId) + + return { pullCount } + }, +}) diff --git a/app/services/jobs/recalculate.server.ts b/app/services/jobs/recalculate.server.ts deleted file mode 100644 index 1cface71..00000000 --- a/app/services/jobs/recalculate.server.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { defineJob } from '@coji/durably' -import { z } from 'zod' -import type { OrganizationId } from '~/app/types/organization' -import { getOrganization } from '~/batch/db/queries' -import { analyzeAndFinalizeSteps } from './shared-steps.server' - -export const recalculateJob = defineJob({ - name: 'recalculate', - input: z.object({ - organizationId: z.string(), - steps: z.object({ - upsert: z.boolean(), - export: z.boolean(), - }), - }), - output: z.object({ - pullCount: z.number(), - }), - run: async (step, input) => { - const orgId = input.organizationId as OrganizationId - - // Step 1: Load organization data - const organization = await step.run('load-organization', async () => { - step.progress(0, 0, 'Loading organization...') - const org = await getOrganization(orgId) - if (!org.organizationSetting) { - throw new Error('No organization setting configured') - } - if (!org.integration) { - throw new Error('No integration configured') - } - return { - organizationSetting: org.organizationSetting, - botLogins: org.botLogins, - repositories: org.repositories, - exportSetting: org.exportSetting, - } - }) - - // Steps 2-5: Analyze → Upsert → Export → Finalize - return await analyzeAndFinalizeSteps(step, orgId, organization, { - steps: input.steps, - }) - }, -}) diff --git a/app/services/jobs/shared-steps.server.ts b/app/services/jobs/shared-steps.server.ts index 23b1f784..0d90d4f0 100644 --- a/app/services/jobs/shared-steps.server.ts +++ b/app/services/jobs/shared-steps.server.ts @@ -1,5 +1,5 @@ /** - * crawl と recalculate ジョブで共有する analyze → upsert → export → finalize ステップ群 + * process ジョブで使う analyze → upsert → export → finalize ステップ群 */ import type { StepContext } from '@coji/durably' import type { Selectable } from 'kysely' diff --git a/batch/cli.ts b/batch/cli.ts index a2180091..9d06bbe0 100644 --- a/batch/cli.ts +++ b/batch/cli.ts @@ -24,37 +24,38 @@ const crawl = command( pr: { type: [Number], description: - 'Specific PR numbers to refresh (requires --repo). e.g. --repo falcon9 --pr 123', + 'Specific PR numbers (requires --repository). e.g. --repository org/repo --pr 123', }, - repo: { + repository: { type: String, - description: 'Repository name to target (required with --pr)', + description: + 'Target repository as owner/repo or repository id (required with --pr)', }, }, help: { description: - 'Fetch from GitHub → analyze → upsert → export → trigger classify. Runs as a durable job.', + 'Fetch raw PR data from GitHub, then enqueue process (analyze → upsert → export → classify). Runs as a durable job.', }, }, async (argv) => { const { crawlCommand } = await import('./commands/crawl') const prNumbers = argv.flags.pr?.length ? argv.flags.pr : undefined - if (prNumbers && !argv.flags.repo) { - consola.error('--repo is required when using --pr') + if (prNumbers && !argv.flags.repository) { + consola.error('--repository is required when using --pr') process.exit(1) } await crawlCommand({ organizationId: argv._.organizationId, refresh: argv.flags.refresh, prNumbers, - repoName: argv.flags.repo, + repository: argv.flags.repository, }) }, ) -const recalculate = command( +const processCmd = command( { - name: 'recalculate', + name: 'process', parameters: ['[organization id]'], flags: { export: { @@ -65,12 +66,12 @@ const recalculate = command( }, help: { description: - 'Re-analyze raw data → upsert to DB. No GitHub API calls. Runs as a durable job.', + 'Analyze stored raw data → upsert → export → classify trigger. No GitHub API calls. Runs as a durable job.', }, }, async (argv) => { - const { recalculateCommand } = await import('./commands/recalculate') - await recalculateCommand({ + const { processCommand } = await import('./commands/process') + await processCommand({ organizationId: argv._.organizationId, export: argv.flags.export, }) @@ -119,7 +120,7 @@ const backfill = command( }, help: { description: - 'Re-fetch PR metadata to fill missing fields in raw data. Runs as a durable job. Run recalculate after this.', + 'Re-fetch PR metadata to fill missing fields in raw data. Runs as a durable job. Run process after this.', }, }, async (argv) => { @@ -151,5 +152,5 @@ process.on('unhandledRejection', async (error) => { }) cli({ - commands: [crawl, recalculate, classify, backfill, report], + commands: [crawl, processCmd, classify, backfill, report], }) diff --git a/batch/commands/backfill.ts b/batch/commands/backfill.ts index dae27f53..5e5007ae 100644 --- a/batch/commands/backfill.ts +++ b/batch/commands/backfill.ts @@ -36,7 +36,7 @@ export async function backfillCommand(props: BackfillCommandProps) { ) consola.success( - `Backfill completed for ${output.repositoryCount} repositories. Run \`recalculate\` to apply changes.`, + `Backfill completed for ${output.repositoryCount} repositories. Run \`process\` to apply changes.`, ) } finally { await durably.stop() diff --git a/batch/commands/crawl.ts b/batch/commands/crawl.ts index 2b37f6d8..e542ccc1 100644 --- a/batch/commands/crawl.ts +++ b/batch/commands/crawl.ts @@ -1,5 +1,7 @@ import consola from 'consola' +import type { Selectable } from 'kysely' import { durably } from '~/app/services/durably.server' +import type { TenantDB } from '~/app/services/tenant-db.server' import { requireOrganization } from './helpers' import { shutdown } from './shutdown' @@ -7,30 +9,63 @@ interface CrawlCommandProps { organizationId?: string refresh: boolean prNumbers?: number[] - repoName?: string + /** `owner/repo` or repository row id */ + repository?: string +} + +function resolveRepositoryId( + repositories: Selectable[], + spec: string, +): string | undefined { + const slash = spec.indexOf('/') + if (slash !== -1) { + const owner = spec.slice(0, slash) + const repo = spec.slice(slash + 1) + return repositories.find((r) => r.owner === owner && r.repo === repo)?.id + } + return repositories.find((r) => r.id === spec)?.id } export async function crawlCommand({ organizationId, refresh, prNumbers, - repoName, + repository, }: CrawlCommandProps) { const result = await requireOrganization(organizationId) if (!result) return - const { orgId } = result + const { orgId, organization } = result + + let repositoryId: string | undefined + if (repository) { + repositoryId = resolveRepositoryId(organization.repositories, repository) + if (!repositoryId) { + consola.error(`Repository not found for this organization: ${repository}`) + return + } + } + + if (prNumbers?.length && !repositoryId) { + consola.error('--repository (owner/repo or id) is required when using --pr') + return + } try { const labels: string[] = [] - if (repoName) labels.push(`repo: ${repoName}`) + if (repository) labels.push(`repository: ${repository}`) if (prNumbers) labels.push(`PRs: ${prNumbers.join(', ')}`) if (refresh) labels.push('full refresh') const label = labels.length > 0 ? ` (${labels.join(', ')})` : '' consola.info(`Starting crawl for ${orgId}${label}...`) const { output } = await durably.jobs.crawl.triggerAndWait( - { organizationId: orgId, refresh, prNumbers, repoName }, + { + organizationId: orgId, + refresh, + prNumbers, + repositoryId, + }, { concurrencyKey: `crawl:${orgId}`, labels: { organizationId: orgId }, @@ -46,7 +81,7 @@ export async function crawlCommand({ ) consola.success( - `Crawl completed. ${output.fetchedRepos} repos, ${output.pullCount} PRs.`, + `Crawl completed. ${output.fetchedRepos} repos, ${output.pullCount} PRs fetched.`, ) } finally { await durably.stop() diff --git a/batch/commands/recalculate.ts b/batch/commands/process.ts similarity index 71% rename from batch/commands/recalculate.ts rename to batch/commands/process.ts index eeb847bc..475feda6 100644 --- a/batch/commands/recalculate.ts +++ b/batch/commands/process.ts @@ -3,15 +3,15 @@ import { durably } from '~/app/services/durably.server' import { requireOrganization } from './helpers' import { shutdown } from './shutdown' -interface RecalculateCommandProps { +interface ProcessCommandProps { organizationId?: string export: boolean } -export async function recalculateCommand({ +export async function processCommand({ organizationId, export: exportFlag, -}: RecalculateCommandProps) { +}: ProcessCommandProps) { const result = await requireOrganization(organizationId) if (!result) return @@ -22,13 +22,13 @@ export async function recalculateCommand({ const flags = [exportFlag ? 'export' : null].filter(Boolean) consola.info( - `Starting recalculate for ${orgId}${flags.length ? ` (+${flags.join(', ')})` : ''}...`, + `Starting process for ${orgId}${flags.length ? ` (+${flags.join(', ')})` : ''}...`, ) - const { output } = await durably.jobs.recalculate.triggerAndWait( + const { output } = await durably.jobs.process.triggerAndWait( { organizationId: orgId, steps }, { - concurrencyKey: `recalculate:${orgId}`, + concurrencyKey: `process:${orgId}`, labels: { organizationId: orgId }, onProgress: (p) => { if (p.message) consola.info(p.message) @@ -41,7 +41,7 @@ export async function recalculateCommand({ }, ) - consola.success(`Recalculate completed. ${output.pullCount} PRs updated.`) + consola.success(`Process completed. ${output.pullCount} PRs updated.`) } finally { await durably.stop() await shutdown() diff --git a/batch/db/mutations.ts b/batch/db/mutations.ts index 09f848e8..69b2d1ed 100644 --- a/batch/db/mutations.ts +++ b/batch/db/mutations.ts @@ -298,7 +298,7 @@ async function updateLastActivityAt( /** * analyze 結果を一括で DB に書き込む共通関数。 - * durably ジョブ(crawl, recalculate)の共通 upsert 処理。 + * durably ジョブ(process)の共通 upsert 処理。 */ export async function upsertAnalyzedData( organizationId: OrganizationId, diff --git a/batch/github/backfill-repo.ts b/batch/github/backfill-repo.ts index 87cd203d..663df72b 100644 --- a/batch/github/backfill-repo.ts +++ b/batch/github/backfill-repo.ts @@ -38,8 +38,9 @@ export async function backfillRepo( if (pr.files && pr.files.length > 0) continue // already has files try { const files = await fetcher.files(pr.number) - pr.files = files - await store.updatePrMetadata([pr]) + const prWithFiles = { ...pr, files } + const fetchedAt = new Date().toISOString() + await store.updatePrMetadata([{ pr: prWithFiles, fetchedAt }]) updated++ } catch (err) { errors++ @@ -63,8 +64,10 @@ export async function backfillRepo( const allPullRequests = await fetcher.pullrequests() logger.info(`fetched ${allPullRequests.length} PR metadata.`) - // raw データの pullRequest JSON だけを更新 - const updated = await store.updatePrMetadata(allPullRequests) + const fetchedAt = new Date().toISOString() + const updated = await store.updatePrMetadata( + allPullRequests.map((pr) => ({ pr, fetchedAt })), + ) logger.info( `updated ${updated} raw records in ${repository.owner}/${repository.repo}`, ) diff --git a/batch/github/store.test.ts b/batch/github/store.test.ts index 82d3528c..ea8f1515 100644 --- a/batch/github/store.test.ts +++ b/batch/github/store.test.ts @@ -40,6 +40,10 @@ const orgId = toOrgId('test-org') const repositoryId = 'repo-1' const tenantDbPath = path.join(testDir, `tenant_${orgId}.db`) +function isoAt(ms: number) { + return new Date(ms).toISOString() +} + function setupTenantDb() { setupTenantSchema(tenantDbPath) const db = new SQLite(tenantDbPath) @@ -168,12 +172,16 @@ describe('store', () => { const discussions = makeDiscussions(1) const timelineItems = makeTimelineItems(1) - await store.savePrData(pr, { - commits, - reviews, - discussions, - timelineItems, - }) + await store.savePrData( + pr, + { + commits, + reviews, + discussions, + timelineItems, + }, + isoAt(1_700_000_000_000), + ) expect(await store.loader.pullrequests()).toEqual([pr]) expect(await store.loader.commits(1)).toEqual(commits) @@ -186,11 +194,15 @@ describe('store', () => { const store = createStore({ organizationId: orgId, repositoryId }) const pr = makePr(1) - await store.savePrData(pr, { - commits: makeCommits(1), - reviews: makeReviews(1), - discussions: makeDiscussions(1), - }) + await store.savePrData( + pr, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_000_000), + ) const updatedPr = { ...pr, title: 'Updated PR #1' } const updatedCommits: ShapedGitHubCommit[] = [ @@ -203,11 +215,15 @@ describe('store', () => { }, ] - await store.savePrData(updatedPr, { - commits: updatedCommits, - reviews: makeReviews(1), - discussions: makeDiscussions(1), - }) + await store.savePrData( + updatedPr, + { + commits: updatedCommits, + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_100_000), + ) const prs = await store.loader.pullrequests() expect(prs).toHaveLength(1) @@ -228,11 +244,15 @@ describe('store', () => { const store = createStore({ organizationId: orgId, repositoryId }) const pr = makePr(1) - await store.savePrData(pr, { - commits: makeCommits(1), - reviews: makeReviews(1), - discussions: makeDiscussions(1), - }) + await store.savePrData( + pr, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_000_000), + ) expect(await store.loader.timelineItems(1)).toEqual([]) }) @@ -258,11 +278,15 @@ describe('store', () => { const store = createStore({ organizationId: orgId, repositoryId }) for (const n of [1, 2, 3]) { - await store.savePrData(makePr(n), { - commits: makeCommits(n), - reviews: makeReviews(n), - discussions: makeDiscussions(n), - }) + await store.savePrData( + makePr(n), + { + commits: makeCommits(n), + reviews: makeReviews(n), + discussions: makeDiscussions(n), + }, + isoAt(1_700_000_000_000 + n), + ) } await store.preloadAll() @@ -278,16 +302,24 @@ describe('store', () => { test('multiple PRs are stored independently', async () => { const store = createStore({ organizationId: orgId, repositoryId }) - await store.savePrData(makePr(1), { - commits: makeCommits(1), - reviews: makeReviews(1), - discussions: makeDiscussions(1), - }) - await store.savePrData(makePr(2), { - commits: makeCommits(2), - reviews: makeReviews(2), - discussions: makeDiscussions(2), - }) + await store.savePrData( + makePr(1), + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_000_000), + ) + await store.savePrData( + makePr(2), + { + commits: makeCommits(2), + reviews: makeReviews(2), + discussions: makeDiscussions(2), + }, + isoAt(1_700_000_000_001), + ) expect(await store.loader.pullrequests()).toHaveLength(2) expect(await store.loader.commits(1)).toEqual(makeCommits(1)) @@ -301,21 +333,33 @@ describe('store', () => { const pr2 = { ...makePr(2), updatedAt: '2024-01-05T00:00:00Z' } const pr3 = { ...makePr(3), updatedAt: '2024-01-03T00:00:00Z' } - await store.savePrData(pr1, { - commits: makeCommits(1), - reviews: makeReviews(1), - discussions: makeDiscussions(1), - }) - await store.savePrData(pr2, { - commits: makeCommits(2), - reviews: makeReviews(2), - discussions: makeDiscussions(2), - }) - await store.savePrData(pr3, { - commits: makeCommits(3), - reviews: makeReviews(3), - discussions: makeDiscussions(3), - }) + await store.savePrData( + pr1, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_000_000), + ) + await store.savePrData( + pr2, + { + commits: makeCommits(2), + reviews: makeReviews(2), + discussions: makeDiscussions(2), + }, + isoAt(1_700_000_000_001), + ) + await store.savePrData( + pr3, + { + commits: makeCommits(3), + reviews: makeReviews(3), + discussions: makeDiscussions(3), + }, + isoAt(1_700_000_000_002), + ) expect(await store.getLatestUpdatedAt()).toBe('2024-01-05T00:00:00Z') }) @@ -328,12 +372,16 @@ describe('store', () => { test('preloadAll uses lazy parsing (data accessible after preload)', async () => { const store = createStore({ organizationId: orgId, repositoryId }) - await store.savePrData(makePr(1), { - commits: makeCommits(1), - reviews: makeReviews(1), - discussions: makeDiscussions(1), - timelineItems: makeTimelineItems(1), - }) + await store.savePrData( + makePr(1), + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + timelineItems: makeTimelineItems(1), + }, + isoAt(1_700_000_000_000), + ) await store.preloadAll() @@ -347,4 +395,82 @@ describe('store', () => { expect(await store.loader.discussions(1)).toEqual(makeDiscussions(1)) expect(await store.loader.timelineItems(1)).toEqual(makeTimelineItems(1)) }) + + test('savePrData does not overwrite row when incoming fetchedAt is older', async () => { + const store = createStore({ organizationId: orgId, repositoryId }) + const pr = makePr(1) + const newer = isoAt(1_700_000_100_000) + const older = isoAt(1_700_000_000_000) + + await store.savePrData( + pr, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + newer, + ) + + const stalePr = { ...pr, title: 'Stale overwrite attempt' } + await store.savePrData( + stalePr, + { + commits: [{ ...makeCommits(1)[0], sha: 'stale-sha' }], + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + older, + ) + + const prs = await store.loader.pullrequests() + expect(prs[0].title).toBe('PR #1') + expect(await store.loader.commits(1)).toEqual(makeCommits(1)) + }) + + test('updatePrMetadata skips when incoming fetchedAt is older than stored', async () => { + const store = createStore({ organizationId: orgId, repositoryId }) + const pr = makePr(1) + await store.savePrData( + pr, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_100_000), + ) + + const staleMeta = { ...pr, title: 'Stale metadata' } + const n = await store.updatePrMetadata([ + { pr: staleMeta, fetchedAt: isoAt(1_700_000_000_000) }, + ]) + expect(n).toBe(0) + + const prs = await store.loader.pullrequests() + expect(prs[0].title).toBe('PR #1') + }) + + test('updatePrMetadata applies when incoming fetchedAt is newer or equal', async () => { + const store = createStore({ organizationId: orgId, repositoryId }) + const pr = makePr(1) + await store.savePrData( + pr, + { + commits: makeCommits(1), + reviews: makeReviews(1), + discussions: makeDiscussions(1), + }, + isoAt(1_700_000_000_000), + ) + + const updated = { ...pr, title: 'New title' } + const n = await store.updatePrMetadata([ + { pr: updated, fetchedAt: isoAt(1_700_000_100_000) }, + ]) + expect(n).toBe(1) + + const prs = await store.loader.pullrequests() + expect(prs[0].title).toBe('New title') + }) }) diff --git a/batch/github/store.ts b/batch/github/store.ts index 8a3c4a81..52882dbd 100644 --- a/batch/github/store.ts +++ b/batch/github/store.ts @@ -40,6 +40,7 @@ export const createStore = ({ discussions: ShapedGitHubReviewComment[] timelineItems?: ShapedTimelineItem[] }, + fetchedAt: string, ) => { await db .insertInto('githubRawData') @@ -54,16 +55,21 @@ export const createStore = ({ ? JSON.stringify(data.timelineItems) : null, updatedAt: pr.updatedAt ?? null, + fetchedAt, }) .onConflict((oc) => - oc.columns(['repositoryId', 'pullRequestNumber']).doUpdateSet((eb) => ({ - pullRequest: eb.ref('excluded.pullRequest'), - commits: eb.ref('excluded.commits'), - reviews: eb.ref('excluded.reviews'), - discussions: eb.ref('excluded.discussions'), - timelineItems: eb.ref('excluded.timelineItems'), - updatedAt: eb.ref('excluded.updatedAt'), - })), + oc + .columns(['repositoryId', 'pullRequestNumber']) + .doUpdateSet((eb) => ({ + pullRequest: eb.ref('excluded.pullRequest'), + commits: eb.ref('excluded.commits'), + reviews: eb.ref('excluded.reviews'), + discussions: eb.ref('excluded.discussions'), + timelineItems: eb.ref('excluded.timelineItems'), + updatedAt: eb.ref('excluded.updatedAt'), + fetchedAt: eb.ref('excluded.fetchedAt'), + })) + .whereRef('excluded.fetchedAt', '>=', 'githubRawData.fetchedAt'), ) .execute() } @@ -73,17 +79,21 @@ export const createStore = ({ * commits/reviews 等はそのまま残る。 * 既に raw データがある PR のみ更新し、ない PR は無視する。 */ - const updatePrMetadata = async (prs: ShapedGitHubPullRequest[]) => { + const updatePrMetadata = async ( + items: Array<{ pr: ShapedGitHubPullRequest; fetchedAt: string }>, + ) => { let updated = 0 - for (const pr of prs) { + for (const { pr, fetchedAt } of items) { const result = await db .updateTable('githubRawData') .set({ pullRequest: JSON.stringify(pr), updatedAt: pr.updatedAt ?? null, + fetchedAt, }) .where('repositoryId', '=', repositoryId) .where('pullRequestNumber', '=', pr.number) + .where((eb) => eb('fetchedAt', '<=', fetchedAt)) .execute() if (result[0].numUpdatedRows > 0n) { updated++ From 0ec58381a87415afa9f38a366ed4106673cd72c5 Mon Sep 17 00:00:00 2001 From: coji Date: Tue, 31 Mar 2026 21:26:03 +0900 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20takt=20cursor=20provider=20=E3=82=92?= =?UTF-8?q?=20full=20permission=20=E3=81=AB=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit edit だと pnpm 実行やファイル削除ができず implement が失敗する。 coder persona は shell 実行が必須なので full が正しい。 Co-Authored-By: Claude Opus 4.6 (1M context) --- .takt/config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.takt/config.yaml b/.takt/config.yaml index 96381a26..1a0f9b7b 100644 --- a/.takt/config.yaml +++ b/.takt/config.yaml @@ -20,7 +20,7 @@ persona_providers: provider_profiles: cursor: - default_permission_mode: edit + default_permission_mode: full codex: default_permission_mode: edit claude: From 6a84f99b9102dbbc87c033cb5981dc19ffe1e26c Mon Sep 17 00:00:00 2001 From: coji Date: Tue, 31 Mar 2026 23:03:12 +0900 Subject: [PATCH 3/5] =?UTF-8?q?refactor:=20simplify=20=E3=83=AC=E3=83=93?= =?UTF-8?q?=E3=83=A5=E3=83=BC=E6=8C=87=E6=91=98=E3=82=92=E5=8F=8D=E6=98=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - findActiveLinkByInstallation を shared に統合(重複クエリ解消) - concurrency key をヘルパー関数に集約(8箇所の文字列リテラル散在を解消) - crawl の trigger-process 内の到達不能コード(scopes.length === 0)を削除 Co-Authored-By: Claude Opus 4.6 (1M context) --- .takt/tasks.yaml | 88 ++++++++++++++++++- .../settings/data-management/index.tsx | 8 +- .../repositories/$repository/$pull/index.tsx | 3 +- .../github-webhook-installation.server.ts | 15 +--- app/services/github-webhook-pull.server.ts | 11 +-- app/services/github-webhook-shared.server.ts | 11 +++ app/services/jobs/concurrency-keys.server.ts | 3 + app/services/jobs/crawl.server.ts | 10 +-- batch/commands/crawl.ts | 3 +- batch/commands/process.ts | 3 +- batch/job-scheduler.ts | 3 +- 11 files changed, 123 insertions(+), 35 deletions(-) create mode 100644 app/services/jobs/concurrency-keys.server.ts diff --git a/.takt/tasks.yaml b/.takt/tasks.yaml index b05ae6b6..66ec243b 100644 --- a/.takt/tasks.yaml +++ b/.takt/tasks.yaml @@ -1 +1,87 @@ -tasks: [] +tasks: + - worktree: false + branch: feat/webhook-realtime-pr-update + piece: implement-step + issue: 255 + auto_pr: false + draft_pr: false + name: webhook-realtime-pr-update-impl + status: failed + slug: 20260331-113806-wh255 + task_dir: .takt/tasks/20260331-113806-wh255 + created_at: 2026-03-31T11:38:06.000Z + started_at: 2026-03-31T11:41:07.279Z + completed_at: 2026-03-31T12:09:48.132Z + owner_pid: null + failure: + movement: fix + error: "Movement execution failed: All parallel sub-movements failed: + arch-review: Report phase failed for architect-review.md: You've hit + your usage limit. Upgrade to Pro (https://chatgpt.com/explore/pro), + visit https://chatgpt.com/codex/settings/usage to purchase more credits + or try again at Apr 3rd, 2026 11:33 PM.; supervise: Report phase failed + for supervisor-validation.md: You've hit your usage limit. Upgrade to + Pro (https://chatgpt.com/explore/pro), visit + https://chatgpt.com/codex/settings/usage to purchase more credits or try + again at Apr 3rd, 2026 11:33 PM." + last_message: >- + レビュー報告を確認し、指摘を修正します。 + + + + + + + レビュー指摘を修正します: PR オブジェクトの直接ミューテーションと、リポジトリスコープ付き refresh 時の full-org + `process` 誤起動。 + + + + PR オブジェクトの非ミュータブル化と、リポジトリスコープ付き refresh 時の full-org `process` + 誤判定を修正します。ヘルパーと回帰テストを追加します。 + + + + + + + + + + + + + 対応内容は次のとおりです。 + + + ## 1. PR オブジェクトの直接ミューテーション(ARCH-NEW-direct-pr-files-mutation / + VAL-NEW-direct-mutation) + + - **`crawl.server.ts`**: `prMetadata.files = files` をやめ、`const prForSave + = { ...prMetadata, files }` を `savePrData` に渡すように変更。 + + - **`$pull/index.tsx`**: 同様に `const prForSave = { ...prMetadata, files + }` を使用。 + + - **`backfill-repo.ts`**: `pr.files = files` をやめ、`const prWithFiles = { + ...pr, files }` を `updatePrMetadata` に渡すように変更。 + + + ## 2. リポジトリ指定付き refresh で full-org `process` が走る問題(carry-over + ARCH-NEW-crawl-scoped-handoff) + + - **`crawl-process-handoff.server.ts`** + を追加し、`shouldTriggerFullOrgProcessJob` で + 「`refresh` かつ **`repositoryId` なし** かつ **PR 番号の指定なし**」のときだけ org 全体の `process` と判定。 + - **`crawl.server.ts`**: 従来の `input.refresh && !input.prNumbers` + を、この関数呼び出しに置き換え。`--repository … --refresh` のときは常に scoped + 側(`updatedPrNumbers` 由来の `scopes`)へ。 + + + ## 3. 回帰テスト + + - **`crawl-process-handoff.server.test.ts`**: 上記判定のユニットテストを追加(リポジトリ付き + refresh が full-org にならないこと等)。 + + + `pnpm validate` は問題なく完了しています(テスト 310 件)。 diff --git a/app/routes/$orgSlug/settings/data-management/index.tsx b/app/routes/$orgSlug/settings/data-management/index.tsx index ae9a9a91..a495fb30 100644 --- a/app/routes/$orgSlug/settings/data-management/index.tsx +++ b/app/routes/$orgSlug/settings/data-management/index.tsx @@ -13,6 +13,10 @@ import { getErrorMessage } from '~/app/libs/error-message' import { orgContext } from '~/app/middleware/context' import { durably } from '~/app/services/durably' import { durably as serverDurably } from '~/app/services/durably.server' +import { + crawlConcurrencyKey, + processConcurrencyKey, +} from '~/app/services/jobs/concurrency-keys.server' import type { JobSteps } from '~/app/services/jobs/shared-steps.server' import ContentSection from '../+components/content-section' import { JobHistory, isRunActive } from './+components/job-history' @@ -41,7 +45,7 @@ export const action = async ({ request, context }: Route.ActionArgs) => { await serverDurably.jobs.crawl.trigger( { organizationId: org.id, refresh: true }, { - concurrencyKey: `crawl:${org.id}`, + concurrencyKey: crawlConcurrencyKey(org.id), labels: { organizationId: org.id }, }, ) @@ -74,7 +78,7 @@ export const action = async ({ request, context }: Route.ActionArgs) => { await serverDurably.jobs.process.trigger( { organizationId: org.id, steps }, { - concurrencyKey: `process:${org.id}`, + concurrencyKey: processConcurrencyKey(org.id), labels: { organizationId: org.id }, }, ) diff --git a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx index bd1bcb62..95039c9b 100644 --- a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx +++ b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx @@ -11,6 +11,7 @@ import { getIntegration, } from '~/app/services/github-integration-queries.server' import { resolveOctokitFromOrg } from '~/app/services/github-octokit.server' +import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import { createFetcher } from '~/batch/github/fetcher' import { createStore } from '~/batch/github/store' import type { Route } from './+types/index' @@ -161,7 +162,7 @@ export const action = async ({ scopes: [{ repositoryId, prNumbers: [pullId] }], }, { - concurrencyKey: `process:${organization.id}`, + concurrencyKey: processConcurrencyKey(organization.id), labels: { organizationId: organization.id }, }, ) diff --git a/app/services/github-webhook-installation.server.ts b/app/services/github-webhook-installation.server.ts index 073f56f1..e41f3afe 100644 --- a/app/services/github-webhook-installation.server.ts +++ b/app/services/github-webhook-installation.server.ts @@ -2,25 +2,14 @@ import createDebug from 'debug' import type { Kysely } from 'kysely' import { db, type DB } from '~/app/services/db.server' import { - type InstallationLike, + findActiveLinkByInstallation, readInstallation, selectionFromInstallation, + type InstallationLike, } from '~/app/services/github-webhook-shared.server' const debug = createDebug('app:github-webhook:installation') -async function findActiveLinkByInstallation( - trx: Kysely, - installationId: number, -) { - return await trx - .selectFrom('githubAppLinks') - .selectAll() - .where('installationId', '=', installationId) - .where('deletedAt', 'is', null) - .executeTakeFirst() -} - async function findActiveLinkByInstallationOrAccount( trx: Kysely, installationId: number, diff --git a/app/services/github-webhook-pull.server.ts b/app/services/github-webhook-pull.server.ts index 3cef976e..a067d66b 100644 --- a/app/services/github-webhook-pull.server.ts +++ b/app/services/github-webhook-pull.server.ts @@ -1,8 +1,10 @@ import { db } from '~/app/services/db.server' import { + findActiveLinkByInstallation, isRecord, readInstallation, } from '~/app/services/github-webhook-shared.server' +import { crawlConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import { getTenantDb } from '~/app/services/tenant-db.server' import type { OrganizationId } from '~/app/types/organization' @@ -36,12 +38,7 @@ export async function handlePullWebhookEvent( const installation = readInstallation(payload) if (!installation) return - const link = await db - .selectFrom('githubAppLinks') - .select('organizationId') - .where('installationId', '=', installation.id) - .where('deletedAt', 'is', null) - .executeTakeFirst() + const link = await findActiveLinkByInstallation(db, installation.id) if (!link) return const orgId = link.organizationId as OrganizationId @@ -70,7 +67,7 @@ export async function handlePullWebhookEvent( prNumbers: [prNumber], }, { - concurrencyKey: `crawl:${orgId}`, + concurrencyKey: crawlConcurrencyKey(orgId), labels: { organizationId: orgId }, coalesce: 'skip', }, diff --git a/app/services/github-webhook-shared.server.ts b/app/services/github-webhook-shared.server.ts index a87ad54a..00a3a637 100644 --- a/app/services/github-webhook-shared.server.ts +++ b/app/services/github-webhook-shared.server.ts @@ -33,3 +33,14 @@ export function selectionFromInstallation( ): 'all' | 'selected' { return installation.repository_selection === 'selected' ? 'selected' : 'all' } + +export async function findActiveLinkByInstallation< + T extends import('kysely').Kysely, +>(dbOrTrx: T, installationId: number) { + return await dbOrTrx + .selectFrom('githubAppLinks') + .selectAll() + .where('installationId', '=', installationId) + .where('deletedAt', 'is', null) + .executeTakeFirst() +} diff --git a/app/services/jobs/concurrency-keys.server.ts b/app/services/jobs/concurrency-keys.server.ts new file mode 100644 index 00000000..aab30870 --- /dev/null +++ b/app/services/jobs/concurrency-keys.server.ts @@ -0,0 +1,3 @@ +export const crawlConcurrencyKey = (orgId: string) => `crawl:${orgId}` as const +export const processConcurrencyKey = (orgId: string) => + `process:${orgId}` as const diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 2f74c835..6fa156b2 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -5,6 +5,7 @@ import { assertOrgGithubAuthResolvable, resolveOctokitFromOrg, } from '~/app/services/github-octokit.server' +import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import { shouldTriggerFullOrgProcessJob } from '~/app/services/jobs/crawl-process-handoff.server' import type { OrganizationId } from '~/app/types/organization' import { getOrganization } from '~/batch/db/queries' @@ -191,7 +192,7 @@ export const crawlJob = defineJob({ await step.run('trigger-process', async () => { const { durably } = await import('~/app/services/durably.server') const processOpts = { - concurrencyKey: `process:${orgId}`, + concurrencyKey: processConcurrencyKey(orgId), labels: { organizationId: orgId }, coalesce: 'skip' as const, } @@ -213,13 +214,6 @@ export const crawlJob = defineJob({ prNumbers: [...set], }), ) - if (scopes.length === 0) { - step.log.info( - 'No updated PRs for scoped process, skipping process job.', - ) - clearOrgCache(orgId) - return - } await durably.jobs.process.trigger( { organizationId: orgId, scopes }, processOpts, diff --git a/batch/commands/crawl.ts b/batch/commands/crawl.ts index e542ccc1..d47ded64 100644 --- a/batch/commands/crawl.ts +++ b/batch/commands/crawl.ts @@ -1,6 +1,7 @@ import consola from 'consola' import type { Selectable } from 'kysely' import { durably } from '~/app/services/durably.server' +import { crawlConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import type { TenantDB } from '~/app/services/tenant-db.server' import { requireOrganization } from './helpers' import { shutdown } from './shutdown' @@ -67,7 +68,7 @@ export async function crawlCommand({ repositoryId, }, { - concurrencyKey: `crawl:${orgId}`, + concurrencyKey: crawlConcurrencyKey(orgId), labels: { organizationId: orgId }, onProgress: (p) => { if (p.message) consola.info(p.message) diff --git a/batch/commands/process.ts b/batch/commands/process.ts index 475feda6..2fd46e20 100644 --- a/batch/commands/process.ts +++ b/batch/commands/process.ts @@ -1,5 +1,6 @@ import consola from 'consola' import { durably } from '~/app/services/durably.server' +import { processConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import { requireOrganization } from './helpers' import { shutdown } from './shutdown' @@ -28,7 +29,7 @@ export async function processCommand({ const { output } = await durably.jobs.process.triggerAndWait( { organizationId: orgId, steps }, { - concurrencyKey: `process:${orgId}`, + concurrencyKey: processConcurrencyKey(orgId), labels: { organizationId: orgId }, onProgress: (p) => { if (p.message) consola.info(p.message) diff --git a/batch/job-scheduler.ts b/batch/job-scheduler.ts index 97241279..9b7ac529 100644 --- a/batch/job-scheduler.ts +++ b/batch/job-scheduler.ts @@ -1,6 +1,7 @@ import consola from 'consola' import schedule from 'node-schedule' import { captureExceptionToSentry } from '~/app/libs/sentry-node.server' +import { crawlConcurrencyKey } from '~/app/services/jobs/concurrency-keys.server' import type { OrganizationId } from '~/app/types/organization' import { listAllOrganizations } from './db' import { logger } from './helper/logger' @@ -35,7 +36,7 @@ export const createJobScheduler = () => { await durably.jobs.crawl.trigger( { organizationId: orgId, refresh: false }, { - concurrencyKey: `crawl:${orgId}`, + concurrencyKey: crawlConcurrencyKey(orgId), labels: { organizationId: orgId }, }, ) From 8f7c93280352c8e97444f79d4b56af7a0c07f06e Mon Sep 17 00:00:00 2001 From: coji Date: Tue, 31 Mar 2026 23:52:21 +0900 Subject: [PATCH 4/5] =?UTF-8?q?fix:=20CodeRabbit=20=E3=83=AC=E3=83=93?= =?UTF-8?q?=E3=83=A5=E3=83=BC=E6=8C=87=E6=91=98=E3=82=92=E5=8F=8D=E6=98=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - backfill-repo: fetchedAt を fetcher.files() の前に取得(stale write 防止) - durably: recalculate エイリアスを移行期間中残す(pending/failed run 互換) - $pull route: triggerAndWait にエラーハンドリング追加 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../repositories/$repository/$pull/index.tsx | 27 ++++++++++++------- app/services/durably.server.ts | 1 + batch/github/backfill-repo.ts | 2 +- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx index 95039c9b..c0921a50 100644 --- a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx +++ b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx @@ -5,6 +5,7 @@ import { href, useFetcher, useRevalidator } from 'react-router' import { match } from 'ts-pattern' import { z } from 'zod' import { Badge, Button, HStack, Heading, Stack } from '~/app/components/ui' +import { getErrorMessage } from '~/app/libs/error-message' import { orgContext } from '~/app/middleware/context' import { getGithubAppLink, @@ -156,16 +157,22 @@ export const action = async ({ ) const { durably } = await import('~/app/services/durably.server') - await durably.jobs.process.triggerAndWait( - { - organizationId: organization.id, - scopes: [{ repositoryId, prNumbers: [pullId] }], - }, - { - concurrencyKey: processConcurrencyKey(organization.id), - labels: { organizationId: organization.id }, - }, - ) + try { + await durably.jobs.process.triggerAndWait( + { + organizationId: organization.id, + scopes: [{ repositoryId, prNumbers: [pullId] }], + }, + { + concurrencyKey: processConcurrencyKey(organization.id), + labels: { organizationId: organization.id }, + }, + ) + } catch (e) { + throw new Response(`Process job failed: ${getErrorMessage(e)}`, { + status: 500, + }) + } return { intent: 'refresh' as const, success: true } }) diff --git a/app/services/durably.server.ts b/app/services/durably.server.ts index f6b1ec03..9f1c3cc2 100644 --- a/app/services/durably.server.ts +++ b/app/services/durably.server.ts @@ -24,6 +24,7 @@ function createDurablyInstance() { classify: classifyJob, crawl: crawlJob, process: processJob, + recalculate: processJob, // migration alias: remove after retainRuns window (7d) }, }) } diff --git a/batch/github/backfill-repo.ts b/batch/github/backfill-repo.ts index 663df72b..975c9ad7 100644 --- a/batch/github/backfill-repo.ts +++ b/batch/github/backfill-repo.ts @@ -37,9 +37,9 @@ export async function backfillRepo( for (const pr of prs) { if (pr.files && pr.files.length > 0) continue // already has files try { + const fetchedAt = new Date().toISOString() const files = await fetcher.files(pr.number) const prWithFiles = { ...pr, files } - const fetchedAt = new Date().toISOString() await store.updatePrMetadata([{ pr: prWithFiles, fetchedAt }]) updated++ } catch (err) { From 1d13ca5aa5974c69516003ec99087eddacaf45dc Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 6 Apr 2026 12:18:04 +0900 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20pullId=20=E3=82=92=20Number()=20?= =?UTF-8?q?=E3=81=A7=E5=A4=89=E6=8F=9B=E3=81=97=E3=81=A6=20prNumbers=20?= =?UTF-8?q?=E3=82=B9=E3=82=AD=E3=83=BC=E3=83=9E=E3=81=AB=E9=81=A9=E5=90=88?= =?UTF-8?q?=E3=81=95=E3=81=9B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit指摘: zx.NumAsString で文字列型の pullId を z.array(z.number()) の prNumbers にそのまま渡していた。Number() で変換して型を一致させる。 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../$orgSlug/settings/repositories/$repository/$pull/index.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx index c0921a50..4a697a1a 100644 --- a/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx +++ b/app/routes/$orgSlug/settings/repositories/$repository/$pull/index.tsx @@ -161,7 +161,7 @@ export const action = async ({ await durably.jobs.process.triggerAndWait( { organizationId: organization.id, - scopes: [{ repositoryId, prNumbers: [pullId] }], + scopes: [{ repositoryId, prNumbers: [Number(pullId)] }], }, { concurrencyKey: processConcurrencyKey(organization.id),