Skip to content

Commit 384f350

Browse files
Stream git hook progress events for stacked git actions (#1214)
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
1 parent 1371ce2 commit 384f350

18 files changed

Lines changed: 1208 additions & 150 deletions

apps/server/src/git/Layers/GitCore.ts

Lines changed: 258 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,27 @@ import {
88
Layer,
99
Option,
1010
Path,
11+
PlatformError,
12+
Ref,
13+
Result,
1114
Schema,
15+
Scope,
16+
Semaphore,
1217
Stream,
1318
} from "effect";
1419
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
1520

1621
import { GitCommandError } from "../Errors.ts";
1722
import {
1823
GitCore,
24+
type ExecuteGitProgress,
25+
type GitCommitOptions,
1926
type GitCoreShape,
2027
type ExecuteGitInput,
2128
type ExecuteGitResult,
2229
} from "../Services/GitCore.ts";
2330
import { ServerConfig } from "../../config.ts";
31+
import { decodeJsonResult } from "@t3tools/shared/schemaJson";
2432

2533
const DEFAULT_TIMEOUT_MS = 30_000;
2634
const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000;
@@ -29,6 +37,11 @@ const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5);
2937
const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048;
3038
const DEFAULT_BASE_BRANCH_CANDIDATES = ["main", "master"] as const;
3139

40+
type TraceTailState = {
41+
processedChars: number;
42+
remainder: string;
43+
};
44+
3245
class StatusUpstreamRefreshCacheKey extends Data.Class<{
3346
cwd: string;
3447
upstreamRef: string;
@@ -40,6 +53,7 @@ interface ExecuteGitOptions {
4053
timeoutMs?: number | undefined;
4154
allowNonZeroExit?: boolean | undefined;
4255
fallbackErrorMessage?: string | undefined;
56+
progress?: ExecuteGitProgress | undefined;
4357
}
4458

4559
function parseBranchAb(value: string): { ahead: number; behind: number } {
@@ -257,14 +271,201 @@ function toGitCommandError(
257271
});
258272
}
259273

274+
interface Trace2Monitor {
275+
readonly env: NodeJS.ProcessEnv;
276+
readonly flush: Effect.Effect<void, never>;
277+
}
278+
279+
function trace2ChildKey(record: Record<string, unknown>): string | null {
280+
const childId = record.child_id;
281+
if (typeof childId === "number" || typeof childId === "string") {
282+
return String(childId);
283+
}
284+
const hookName = record.hook_name;
285+
return typeof hookName === "string" && hookName.trim().length > 0 ? hookName.trim() : null;
286+
}
287+
288+
const Trace2Record = Schema.Record(Schema.String, Schema.Unknown);
289+
290+
const createTrace2Monitor = Effect.fn(function* (
291+
input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">,
292+
progress: ExecuteGitProgress | undefined,
293+
): Effect.fn.Return<
294+
Trace2Monitor,
295+
PlatformError.PlatformError,
296+
Scope.Scope | FileSystem.FileSystem | Path.Path
297+
> {
298+
if (!progress?.onHookStarted && !progress?.onHookFinished) {
299+
return {
300+
env: {},
301+
flush: Effect.void,
302+
};
303+
}
304+
305+
const fs = yield* FileSystem.FileSystem;
306+
const path = yield* Path.Path;
307+
const traceFilePath = yield* fs.makeTempFileScoped({
308+
prefix: `t3code-git-trace2-${process.pid}-`,
309+
suffix: ".json",
310+
});
311+
const hookStartByChildKey = new Map<string, { hookName: string; startedAtMs: number }>();
312+
const traceTailState = yield* Ref.make<TraceTailState>({
313+
processedChars: 0,
314+
remainder: "",
315+
});
316+
317+
const handleTraceLine = (line: string) =>
318+
Effect.gen(function* () {
319+
const trimmedLine = line.trim();
320+
if (trimmedLine.length === 0) {
321+
return;
322+
}
323+
324+
const traceRecord = decodeJsonResult(Trace2Record)(trimmedLine);
325+
if (Result.isFailure(traceRecord)) {
326+
yield* Effect.logDebug(
327+
`GitCore.trace2: failed to parse trace line for ${quoteGitCommand(input.args)} in ${input.cwd}`,
328+
traceRecord.failure,
329+
);
330+
return;
331+
}
332+
333+
if (traceRecord.success.child_class !== "hook") {
334+
return;
335+
}
336+
337+
const event = traceRecord.success.event;
338+
const childKey = trace2ChildKey(traceRecord.success);
339+
if (childKey === null) {
340+
return;
341+
}
342+
const started = hookStartByChildKey.get(childKey);
343+
const hookNameFromEvent =
344+
typeof traceRecord.success.hook_name === "string"
345+
? traceRecord.success.hook_name.trim()
346+
: "";
347+
const hookName = hookNameFromEvent.length > 0 ? hookNameFromEvent : (started?.hookName ?? "");
348+
if (hookName.length === 0) {
349+
return;
350+
}
351+
352+
if (event === "child_start") {
353+
hookStartByChildKey.set(childKey, { hookName, startedAtMs: Date.now() });
354+
if (progress.onHookStarted) {
355+
yield* progress.onHookStarted(hookName);
356+
}
357+
return;
358+
}
359+
360+
if (event === "child_exit") {
361+
hookStartByChildKey.delete(childKey);
362+
if (progress.onHookFinished) {
363+
const code = traceRecord.success.code;
364+
yield* progress.onHookFinished({
365+
hookName: started?.hookName ?? hookName,
366+
exitCode: typeof code === "number" && Number.isInteger(code) ? code : null,
367+
durationMs: started ? Math.max(0, Date.now() - started.startedAtMs) : null,
368+
});
369+
}
370+
}
371+
});
372+
373+
const deltaMutex = yield* Semaphore.make(1);
374+
const readTraceDelta = deltaMutex.withPermit(
375+
fs.readFileString(traceFilePath).pipe(
376+
Effect.flatMap((contents) =>
377+
Effect.uninterruptible(
378+
Ref.modify(traceTailState, ({ processedChars, remainder }) => {
379+
if (contents.length <= processedChars) {
380+
return [[], { processedChars, remainder }];
381+
}
382+
383+
const appended = contents.slice(processedChars);
384+
const combined = remainder + appended;
385+
const lines = combined.split("\n");
386+
const nextRemainder = lines.pop() ?? "";
387+
388+
return [
389+
lines.map((line) => line.replace(/\r$/, "")),
390+
{
391+
processedChars: contents.length,
392+
remainder: nextRemainder,
393+
},
394+
];
395+
}).pipe(
396+
Effect.flatMap((lines) => Effect.forEach(lines, handleTraceLine, { discard: true })),
397+
),
398+
),
399+
),
400+
Effect.ignore({ log: true }),
401+
),
402+
);
403+
const traceFileName = path.basename(traceFilePath);
404+
yield* Stream.runForEach(fs.watch(traceFilePath), (event) => {
405+
const eventPath = event.path;
406+
const isTargetTraceEvent =
407+
eventPath === traceFilePath ||
408+
eventPath === traceFileName ||
409+
path.basename(eventPath) === traceFileName;
410+
if (!isTargetTraceEvent) return Effect.void;
411+
return readTraceDelta;
412+
}).pipe(Effect.ignoreCause({ log: true }), Effect.forkScoped);
413+
414+
yield* Effect.addFinalizer(() =>
415+
Effect.gen(function* () {
416+
yield* readTraceDelta;
417+
const finalLine = yield* Ref.modify(traceTailState, ({ processedChars, remainder }) => [
418+
remainder.trim(),
419+
{
420+
processedChars,
421+
remainder: "",
422+
},
423+
]);
424+
if (finalLine.length > 0) {
425+
yield* handleTraceLine(finalLine);
426+
}
427+
}),
428+
);
429+
430+
return {
431+
env: {
432+
GIT_TRACE2_EVENT: traceFilePath,
433+
},
434+
flush: readTraceDelta,
435+
};
436+
});
437+
260438
const collectOutput = Effect.fn(function* <E>(
261439
input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">,
262440
stream: Stream.Stream<Uint8Array, E>,
263441
maxOutputBytes: number,
442+
onLine: ((line: string) => Effect.Effect<void, never>) | undefined,
264443
): Effect.fn.Return<string, GitCommandError> {
265444
const decoder = new TextDecoder();
266445
let bytes = 0;
267446
let text = "";
447+
let lineBuffer = "";
448+
449+
const emitCompleteLines = (flush: boolean) =>
450+
Effect.gen(function* () {
451+
let newlineIndex = lineBuffer.indexOf("\n");
452+
while (newlineIndex >= 0) {
453+
const line = lineBuffer.slice(0, newlineIndex).replace(/\r$/, "");
454+
lineBuffer = lineBuffer.slice(newlineIndex + 1);
455+
if (line.length > 0 && onLine) {
456+
yield* onLine(line);
457+
}
458+
newlineIndex = lineBuffer.indexOf("\n");
459+
}
460+
461+
if (flush) {
462+
const trailing = lineBuffer.replace(/\r$/, "");
463+
lineBuffer = "";
464+
if (trailing.length > 0 && onLine) {
465+
yield* onLine(trailing);
466+
}
467+
}
468+
});
268469

269470
yield* Stream.runForEach(stream, (chunk) =>
270471
Effect.gen(function* () {
@@ -277,11 +478,17 @@ const collectOutput = Effect.fn(function* <E>(
277478
detail: `${quoteGitCommand(input.args)} output exceeded ${maxOutputBytes} bytes and was truncated.`,
278479
});
279480
}
280-
text += decoder.decode(chunk, { stream: true });
481+
const decoded = decoder.decode(chunk, { stream: true });
482+
text += decoded;
483+
lineBuffer += decoded;
484+
yield* emitCompleteLines(false);
281485
}),
282486
).pipe(Effect.mapError(toGitCommandError(input, "output stream failed.")));
283487

284-
text += decoder.decode();
488+
const remainder = decoder.decode();
489+
text += remainder;
490+
lineBuffer += remainder;
491+
yield* emitCompleteLines(true);
285492
return text;
286493
});
287494

@@ -306,26 +513,46 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
306513
const maxOutputBytes = input.maxOutputBytes ?? DEFAULT_MAX_OUTPUT_BYTES;
307514

308515
const commandEffect = Effect.gen(function* () {
516+
const trace2Monitor = yield* createTrace2Monitor(commandInput, input.progress).pipe(
517+
Effect.provideService(Path.Path, path),
518+
Effect.provideService(FileSystem.FileSystem, fileSystem),
519+
Effect.mapError(toGitCommandError(commandInput, "failed to create trace2 monitor.")),
520+
);
309521
const child = yield* commandSpawner
310522
.spawn(
311523
ChildProcess.make("git", commandInput.args, {
312524
cwd: commandInput.cwd,
313-
...(input.env ? { env: input.env } : {}),
525+
env: {
526+
...process.env,
527+
...input.env,
528+
...trace2Monitor.env,
529+
},
314530
}),
315531
)
316532
.pipe(Effect.mapError(toGitCommandError(commandInput, "failed to spawn.")));
317533

318534
const [stdout, stderr, exitCode] = yield* Effect.all(
319535
[
320-
collectOutput(commandInput, child.stdout, maxOutputBytes),
321-
collectOutput(commandInput, child.stderr, maxOutputBytes),
536+
collectOutput(
537+
commandInput,
538+
child.stdout,
539+
maxOutputBytes,
540+
input.progress?.onStdoutLine,
541+
),
542+
collectOutput(
543+
commandInput,
544+
child.stderr,
545+
maxOutputBytes,
546+
input.progress?.onStderrLine,
547+
),
322548
child.exitCode.pipe(
323549
Effect.map((value) => Number(value)),
324550
Effect.mapError(toGitCommandError(commandInput, "failed to report exit code.")),
325551
),
326552
],
327553
{ concurrency: "unbounded" },
328554
);
555+
yield* trace2Monitor.flush;
329556

330557
if (!input.allowNonZeroExit && exitCode !== 0) {
331558
const trimmedStderr = stderr.trim();
@@ -376,6 +603,7 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
376603
args,
377604
allowNonZeroExit: true,
378605
...(options.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}),
606+
...(options.progress ? { progress: options.progress } : {}),
379607
}).pipe(
380608
Effect.flatMap((result) => {
381609
if (options.allowNonZeroExit || result.code === 0) {
@@ -947,14 +1175,37 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute"
9471175
};
9481176
});
9491177

950-
const commit: GitCoreShape["commit"] = (cwd, subject, body) =>
1178+
const commit: GitCoreShape["commit"] = (cwd, subject, body, options?: GitCommitOptions) =>
9511179
Effect.gen(function* () {
9521180
const args = ["commit", "-m", subject];
9531181
const trimmedBody = body.trim();
9541182
if (trimmedBody.length > 0) {
9551183
args.push("-m", trimmedBody);
9561184
}
957-
yield* runGit("GitCore.commit.commit", cwd, args);
1185+
const progress = options?.progress
1186+
? {
1187+
...(options.progress.onOutputLine
1188+
? {
1189+
onStdoutLine: (line: string) =>
1190+
options.progress?.onOutputLine?.({ stream: "stdout", text: line }) ??
1191+
Effect.void,
1192+
onStderrLine: (line: string) =>
1193+
options.progress?.onOutputLine?.({ stream: "stderr", text: line }) ??
1194+
Effect.void,
1195+
}
1196+
: {}),
1197+
...(options.progress.onHookStarted
1198+
? { onHookStarted: options.progress.onHookStarted }
1199+
: {}),
1200+
...(options.progress.onHookFinished
1201+
? { onHookFinished: options.progress.onHookFinished }
1202+
: {}),
1203+
}
1204+
: null;
1205+
yield* executeGit("GitCore.commit.commit", cwd, args, {
1206+
...(options?.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}),
1207+
...(progress ? { progress } : {}),
1208+
}).pipe(Effect.asVoid);
9581209
const commitSha = yield* runGitStdout("GitCore.commit.revParseHead", cwd, [
9591210
"rev-parse",
9601211
"HEAD",

0 commit comments

Comments
 (0)