From 7447e87a80c99b5540b8c162f623a24b0b3cc80c Mon Sep 17 00:00:00 2001 From: theonlychant Date: Mon, 4 May 2026 00:57:17 -0500 Subject: [PATCH] fix(scheduler): prevent duplicate session firing and zombie session re-triggering Signed-off-by: theonlychant --- .../label-workflow-integration.test.ts | 39 +++++++++ packages/fleet/src/cli/merge.command.ts | 12 +++ packages/fleet/src/dispatch/events.ts | 80 +++++++++++++++++++ packages/fleet/src/dispatch/handler.ts | 74 ++++++++++++++--- packages/fleet/src/types/global.d.ts | 18 +++++ 5 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 packages/fleet/src/types/global.d.ts diff --git a/packages/fleet/src/__tests__/label-workflow-integration.test.ts b/packages/fleet/src/__tests__/label-workflow-integration.test.ts index 0a914b8..f1ec9ef 100644 --- a/packages/fleet/src/__tests__/label-workflow-integration.test.ts +++ b/packages/fleet/src/__tests__/label-workflow-integration.test.ts @@ -121,6 +121,45 @@ fi `; fs.writeFileSync(mockGhPath, mockGhScript, { mode: 0o755 }); + // Provide a minimal `jq` shim so tests don't rely on the system `jq` binary. + // This supports the simple expressions used by the label script tests. + const jqShimPath = path.join(dir, 'jq'); + const jqShim = `#!/usr/bin/env node +const fs = require('fs'); +const argv = process.argv.slice(2); +if (argv.length === 0) process.exit(0); +// support optional -r flag: jq -r +if (argv[0] === '-r') argv.shift(); +const expr = argv[0] || ''; +const file = argv[1] || ''; +try { + const data = JSON.parse(fs.readFileSync(file, 'utf8')); + if (expr.includes('closingIssuesReferences')) { + const arr = data.closingIssuesReferences || []; + if (arr[0] && arr[0].number != null) { + console.log(arr[0].number); + } + process.exit(0); + } + if (expr.includes('labels') && expr.includes('any')) { + const labels = (data.labels || []).map((l) => l.name); + console.log(labels.includes('fleet') ? 'true' : 'false'); + process.exit(0); + } + if (expr.includes('milestone')) { + const title = data.milestone && data.milestone.title ? data.milestone.title : ''; + if (title) console.log(title); + process.exit(0); + } + // Fallback: print the whole JSON + console.log(JSON.stringify(data)); +} catch (e) { + console.error('jq-shim error', e.message); + process.exit(1); +} +`; + fs.writeFileSync(jqShimPath, jqShim, { mode: 0o755 }); + // Write the actual label script extracted from the template const script = extractRunScript(); fs.writeFileSync(scriptPath, `#!/bin/bash\nset -e\n${script}`, { mode: 0o755 }); diff --git a/packages/fleet/src/cli/merge.command.ts b/packages/fleet/src/cli/merge.command.ts index 69374f7..13d43bb 100644 --- a/packages/fleet/src/cli/merge.command.ts +++ b/packages/fleet/src/cli/merge.command.ts @@ -108,6 +108,18 @@ export default defineCommand({ }, async sendMessage(sessionId, message) { const session = jules.session(sessionId); + try { + const info = await session.info(); + const state = info.state; + if (state === 'completed' || state === 'failed') { + // Do not wake completed/failed sessions — treat as no-op + return; + } + } catch (e) { + // If we cannot fetch info, be conservative and skip sending to avoid reviving sessions + return; + } + await session.send(message); }, }; diff --git a/packages/fleet/src/dispatch/events.ts b/packages/fleet/src/dispatch/events.ts index b546a0b..b0da2db 100644 --- a/packages/fleet/src/dispatch/events.ts +++ b/packages/fleet/src/dispatch/events.ts @@ -57,3 +57,83 @@ export async function recordDispatch( timestamp, }; } + +/** + * Create a preliminary (pending) dispatch comment to act as a lock. + * This is written before session creation to ensure idempotent dispatch + * when multiple schedulers/retries run concurrently. + */ +export async function createPendingDispatch( + octokit: Octokit, + owner: string, + repo: string, + issueNumber: number, +): Promise<{ commentId: number; token: string }> { + const timestamp = new Date().toISOString(); + const token = `pending-${Math.random().toString(36).slice(2, 9)}`; + const readableTime = new Date().toLocaleString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: 'numeric', + minute: '2-digit', + timeZone: 'UTC', + timeZoneName: 'short', + }); + + const body = [ + `🤖 **Fleet Dispatch Event (PENDING)**`, + `Dispatch token: ${token}`, + `Timestamp: ${readableTime}`, + '', + 'This placeholder prevents duplicate dispatch while a session is being created.', + ].join('\n'); + + const { data: comment } = await octokit.rest.issues.createComment({ + owner, + repo, + issue_number: issueNumber, + body, + }); + + return { commentId: comment.id, token }; +} + +/** + * Finalize an existing dispatch comment by editing it to include the session link. + */ +export async function finalizeDispatch( + octokit: Octokit, + owner: string, + repo: string, + issueNumber: number, + commentId: number, + sessionId: string, +): Promise { + const timestamp = new Date().toISOString(); + const readableTime = new Date().toLocaleString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: 'numeric', + minute: '2-digit', + timeZone: 'UTC', + timeZoneName: 'short', + }); + const sessionLink = `https://jules.google.com/session/${sessionId}`; + + const body = [ + `🤖 **Fleet Dispatch Event**`, + `Session: [\`${sessionId}\`](${sessionLink})`, + `Timestamp: ${readableTime}`, + ].join('\n'); + + await octokit.rest.issues.updateComment({ + owner, + repo, + comment_id: commentId, + body, + }); + + return { commentId, timestamp }; +} diff --git a/packages/fleet/src/dispatch/handler.ts b/packages/fleet/src/dispatch/handler.ts index 81afe4e..5933cd0 100644 --- a/packages/fleet/src/dispatch/handler.ts +++ b/packages/fleet/src/dispatch/handler.ts @@ -19,7 +19,7 @@ import type { FleetEmitter } from '../shared/events.js'; import { ok, fail } from '../shared/result/index.js'; import { getMilestoneContext } from '../analyze/milestone.js'; import { getDispatchStatus } from './status.js'; -import { recordDispatch } from './events.js'; +import { recordDispatch, createPendingDispatch, finalizeDispatch } from './events.js'; import { parseGoalFile } from '../analyze/goals.js'; import { globSync } from 'glob'; import { existsSync, readFileSync } from 'node:fs'; @@ -105,6 +105,25 @@ export class DispatchHandler implements DispatchSpec { const workerPrompt = buildWorkerPrompt(issue, verificationCommands, input.milestone); + // Create a pending dispatch comment as a lock to avoid duplicate dispatch + let pendingCommentId: number | undefined; + try { + const pending = await createPendingDispatch( + this.octokit, + input.owner, + input.repo, + issue.number, + ); + pendingCommentId = pending.commentId; + } catch (err) { + // If creating the pending comment fails, emit and continue — we'll still attempt dispatch + this.emit({ + type: 'error', + code: 'PENDING_COMMENT_FAILED', + message: `Could not write pending dispatch comment for #${issue.number}: ${err instanceof Error ? err.message : String(err)}`, + }); + } + try { const session = await this.dispatcher.dispatch({ prompt: workerPrompt, @@ -116,14 +135,37 @@ export class DispatchHandler implements DispatchSpec { autoPr: true, }); - // Record the dispatch event - await recordDispatch( - this.octokit, - input.owner, - input.repo, - issue.number, - session.id, - ); + // Finalize the pending comment (if any) to include the real session link. + if (pendingCommentId) { + try { + await finalizeDispatch( + this.octokit, + input.owner, + input.repo, + issue.number, + pendingCommentId, + session.id, + ); + } catch (err) { + // Best-effort — if finalizing fails, fall back to appending a new record + await recordDispatch( + this.octokit, + input.owner, + input.repo, + issue.number, + session.id, + ); + } + } else { + // No pending comment created, fall back to normal record + await recordDispatch( + this.octokit, + input.owner, + input.repo, + issue.number, + session.id, + ); + } dispatched.push({ issueNumber: issue.number, @@ -141,6 +183,20 @@ export class DispatchHandler implements DispatchSpec { code: 'DISPATCH_FAILED', message: `Failed to dispatch #${issue.number}: ${error instanceof Error ? error.message : error}`, }); + + // If we created a pending comment but dispatch failed, update the comment to reflect failure + if (pendingCommentId) { + try { + await this.octokit.rest.issues.updateComment({ + owner: input.owner, + repo: input.repo, + comment_id: pendingCommentId, + body: `🤖 **Fleet Dispatch Event (FAILED)**\n\nDispatch attempt failed for this placeholder. See logs for details.`, + }); + } catch { + // ignore best-effort cleanup errors + } + } } } diff --git a/packages/fleet/src/types/global.d.ts b/packages/fleet/src/types/global.d.ts new file mode 100644 index 0000000..8654f90 --- /dev/null +++ b/packages/fleet/src/types/global.d.ts @@ -0,0 +1,18 @@ +// Ambient shims to help local typecheck when node types are not installed. +declare var process: { + env: { [key: string]: string | undefined }; + exit(code?: number): never; + on(event: string, listener: (...args: any[]) => void): this; +}; + +declare var Buffer: { + from(input: string, encoding?: string): { toString(encoding?: string): string }; +}; + +declare namespace NodeJS { + interface ProcessEnv { + [key: string]: string | undefined; + } +} + +export {};