Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions packages/fleet/src/__tests__/label-workflow-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,45 @@ fi
`;
fs.writeFileSync(mockGhPath, mockGhScript, { mode: 0o755 });

// Provide a minimal `jq` shim so tests don't rely on the system `jq` binary.
// This supports the simple expressions used by the label script tests.
const jqShimPath = path.join(dir, 'jq');
const jqShim = `#!/usr/bin/env node
const fs = require('fs');
const argv = process.argv.slice(2);
if (argv.length === 0) process.exit(0);
// support optional -r flag: jq -r <expr> <file>
if (argv[0] === '-r') argv.shift();
const expr = argv[0] || '';
const file = argv[1] || '';
try {
const data = JSON.parse(fs.readFileSync(file, 'utf8'));
if (expr.includes('closingIssuesReferences')) {
const arr = data.closingIssuesReferences || [];
if (arr[0] && arr[0].number != null) {
console.log(arr[0].number);
}
process.exit(0);
}
if (expr.includes('labels') && expr.includes('any')) {
const labels = (data.labels || []).map((l) => l.name);
console.log(labels.includes('fleet') ? 'true' : 'false');
process.exit(0);
}
if (expr.includes('milestone')) {
const title = data.milestone && data.milestone.title ? data.milestone.title : '';
if (title) console.log(title);
process.exit(0);
}
// Fallback: print the whole JSON
console.log(JSON.stringify(data));
} catch (e) {
console.error('jq-shim error', e.message);
process.exit(1);
}
`;
fs.writeFileSync(jqShimPath, jqShim, { mode: 0o755 });

// Write the actual label script extracted from the template
const script = extractRunScript();
fs.writeFileSync(scriptPath, `#!/bin/bash\nset -e\n${script}`, { mode: 0o755 });
Expand Down
12 changes: 12 additions & 0 deletions packages/fleet/src/cli/merge.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ export default defineCommand({
},
async sendMessage(sessionId, message) {
const session = jules.session(sessionId);
try {
const info = await session.info();
const state = info.state;
if (state === 'completed' || state === 'failed') {
// Do not wake completed/failed sessions — treat as no-op
return;
}
} catch (e) {
// If we cannot fetch info, be conservative and skip sending to avoid reviving sessions
return;
}

await session.send(message);
},
};
Expand Down
80 changes: 80 additions & 0 deletions packages/fleet/src/dispatch/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,83 @@ export async function recordDispatch(
timestamp,
};
}

/**
* Create a preliminary (pending) dispatch comment to act as a lock.
* This is written before session creation to ensure idempotent dispatch
* when multiple schedulers/retries run concurrently.
*/
export async function createPendingDispatch(
octokit: Octokit,
owner: string,
repo: string,
issueNumber: number,
): Promise<{ commentId: number; token: string }> {
const timestamp = new Date().toISOString();
const token = `pending-${Math.random().toString(36).slice(2, 9)}`;
const readableTime = new Date().toLocaleString('en-US', {
month: 'short',
day: 'numeric',
year: 'numeric',
hour: 'numeric',
minute: '2-digit',
timeZone: 'UTC',
timeZoneName: 'short',
});

const body = [
`🤖 **Fleet Dispatch Event (PENDING)**`,
`Dispatch token: ${token}`,
`Timestamp: ${readableTime}`,
'',
'This placeholder prevents duplicate dispatch while a session is being created.',
].join('\n');

const { data: comment } = await octokit.rest.issues.createComment({
owner,
repo,
issue_number: issueNumber,
body,
});

return { commentId: comment.id, token };
}

/**
* Finalize an existing dispatch comment by editing it to include the session link.
*/
export async function finalizeDispatch(
octokit: Octokit,
owner: string,
repo: string,
issueNumber: number,
commentId: number,
sessionId: string,
): Promise<DispatchRecord> {
const timestamp = new Date().toISOString();
const readableTime = new Date().toLocaleString('en-US', {
month: 'short',
day: 'numeric',
year: 'numeric',
hour: 'numeric',
minute: '2-digit',
timeZone: 'UTC',
timeZoneName: 'short',
});
const sessionLink = `https://jules.google.com/session/${sessionId}`;

const body = [
`🤖 **Fleet Dispatch Event**`,
`Session: [\`${sessionId}\`](${sessionLink})`,
`Timestamp: ${readableTime}`,
].join('\n');

await octokit.rest.issues.updateComment({
owner,
repo,
comment_id: commentId,
body,
});

return { commentId, timestamp };
}
74 changes: 65 additions & 9 deletions packages/fleet/src/dispatch/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type { FleetEmitter } from '../shared/events.js';
import { ok, fail } from '../shared/result/index.js';
import { getMilestoneContext } from '../analyze/milestone.js';
import { getDispatchStatus } from './status.js';
import { recordDispatch } from './events.js';
import { recordDispatch, createPendingDispatch, finalizeDispatch } from './events.js';
import { parseGoalFile } from '../analyze/goals.js';
import { globSync } from 'glob';
import { existsSync, readFileSync } from 'node:fs';
Expand Down Expand Up @@ -105,6 +105,25 @@ export class DispatchHandler implements DispatchSpec {

const workerPrompt = buildWorkerPrompt(issue, verificationCommands, input.milestone);

// Create a pending dispatch comment as a lock to avoid duplicate dispatch
let pendingCommentId: number | undefined;
try {
const pending = await createPendingDispatch(
this.octokit,
input.owner,
input.repo,
issue.number,
);
pendingCommentId = pending.commentId;
} catch (err) {
// If creating the pending comment fails, emit and continue — we'll still attempt dispatch
this.emit({
type: 'error',
code: 'PENDING_COMMENT_FAILED',
message: `Could not write pending dispatch comment for #${issue.number}: ${err instanceof Error ? err.message : String(err)}`,
});
}

try {
const session = await this.dispatcher.dispatch({
prompt: workerPrompt,
Expand All @@ -116,14 +135,37 @@ export class DispatchHandler implements DispatchSpec {
autoPr: true,
});

// Record the dispatch event
await recordDispatch(
this.octokit,
input.owner,
input.repo,
issue.number,
session.id,
);
// Finalize the pending comment (if any) to include the real session link.
if (pendingCommentId) {
try {
await finalizeDispatch(
this.octokit,
input.owner,
input.repo,
issue.number,
pendingCommentId,
session.id,
);
} catch (err) {
// Best-effort — if finalizing fails, fall back to appending a new record
await recordDispatch(
this.octokit,
input.owner,
input.repo,
issue.number,
session.id,
);
}
} else {
// No pending comment created, fall back to normal record
await recordDispatch(
this.octokit,
input.owner,
input.repo,
issue.number,
session.id,
);
}

dispatched.push({
issueNumber: issue.number,
Expand All @@ -141,6 +183,20 @@ export class DispatchHandler implements DispatchSpec {
code: 'DISPATCH_FAILED',
message: `Failed to dispatch #${issue.number}: ${error instanceof Error ? error.message : error}`,
});

// If we created a pending comment but dispatch failed, update the comment to reflect failure
if (pendingCommentId) {
try {
await this.octokit.rest.issues.updateComment({
owner: input.owner,
repo: input.repo,
comment_id: pendingCommentId,
body: `🤖 **Fleet Dispatch Event (FAILED)**\n\nDispatch attempt failed for this placeholder. See logs for details.`,
});
} catch {
// ignore best-effort cleanup errors
}
}
}
}

Expand Down
18 changes: 18 additions & 0 deletions packages/fleet/src/types/global.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Ambient shims to help local typecheck when node types are not installed.
declare var process: {
env: { [key: string]: string | undefined };
exit(code?: number): never;
on(event: string, listener: (...args: any[]) => void): this;
};

declare var Buffer: {
from(input: string, encoding?: string): { toString(encoding?: string): string };
};

declare namespace NodeJS {
interface ProcessEnv {
[key: string]: string | undefined;
}
}

export {};