-
Notifications
You must be signed in to change notification settings - Fork 1k
Stream git hook progress events for stacked git actions #1214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dd75189
c474896
7eb93a1
c7109bd
a77c4ca
004247f
7984dc0
e9a132f
367ae47
7741845
3ee41b1
ccf18ea
63b48e2
035f61d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,19 +8,27 @@ import { | |
| Layer, | ||
| Option, | ||
| Path, | ||
| PlatformError, | ||
| Ref, | ||
| Result, | ||
| Schema, | ||
| Scope, | ||
| Semaphore, | ||
| Stream, | ||
| } from "effect"; | ||
| import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; | ||
|
|
||
| import { GitCommandError } from "../Errors.ts"; | ||
| import { | ||
| GitCore, | ||
| type ExecuteGitProgress, | ||
| type GitCommitOptions, | ||
| type GitCoreShape, | ||
| type ExecuteGitInput, | ||
| type ExecuteGitResult, | ||
| } from "../Services/GitCore.ts"; | ||
| import { ServerConfig } from "../../config.ts"; | ||
| import { decodeJsonResult } from "@t3tools/shared/schemaJson"; | ||
|
|
||
| const DEFAULT_TIMEOUT_MS = 30_000; | ||
| const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000; | ||
|
|
@@ -29,6 +37,11 @@ const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5); | |
| const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048; | ||
| const DEFAULT_BASE_BRANCH_CANDIDATES = ["main", "master"] as const; | ||
|
|
||
| type TraceTailState = { | ||
| processedChars: number; | ||
| remainder: string; | ||
| }; | ||
|
|
||
| class StatusUpstreamRefreshCacheKey extends Data.Class<{ | ||
| cwd: string; | ||
| upstreamRef: string; | ||
|
|
@@ -40,6 +53,7 @@ interface ExecuteGitOptions { | |
| timeoutMs?: number | undefined; | ||
| allowNonZeroExit?: boolean | undefined; | ||
| fallbackErrorMessage?: string | undefined; | ||
| progress?: ExecuteGitProgress | undefined; | ||
| } | ||
|
|
||
| function parseBranchAb(value: string): { ahead: number; behind: number } { | ||
|
|
@@ -257,14 +271,201 @@ function toGitCommandError( | |
| }); | ||
| } | ||
|
|
||
| interface Trace2Monitor { | ||
| readonly env: NodeJS.ProcessEnv; | ||
| readonly flush: Effect.Effect<void, never>; | ||
| } | ||
|
|
||
| function trace2ChildKey(record: Record<string, unknown>): string | null { | ||
| const childId = record.child_id; | ||
| if (typeof childId === "number" || typeof childId === "string") { | ||
| return String(childId); | ||
| } | ||
| const hookName = record.hook_name; | ||
| return typeof hookName === "string" && hookName.trim().length > 0 ? hookName.trim() : null; | ||
| } | ||
|
|
||
| const Trace2Record = Schema.Record(Schema.String, Schema.Unknown); | ||
|
|
||
| const createTrace2Monitor = Effect.fn(function* ( | ||
| input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">, | ||
| progress: ExecuteGitProgress | undefined, | ||
| ): Effect.fn.Return< | ||
| Trace2Monitor, | ||
| PlatformError.PlatformError, | ||
| Scope.Scope | FileSystem.FileSystem | Path.Path | ||
| > { | ||
| if (!progress?.onHookStarted && !progress?.onHookFinished) { | ||
| return { | ||
| env: {}, | ||
| flush: Effect.void, | ||
| }; | ||
| } | ||
|
|
||
| const fs = yield* FileSystem.FileSystem; | ||
| const path = yield* Path.Path; | ||
| const traceFilePath = yield* fs.makeTempFileScoped({ | ||
| prefix: `t3code-git-trace2-${process.pid}-`, | ||
| suffix: ".json", | ||
| }); | ||
| const hookStartByChildKey = new Map<string, { hookName: string; startedAtMs: number }>(); | ||
| const traceTailState = yield* Ref.make<TraceTailState>({ | ||
| processedChars: 0, | ||
| remainder: "", | ||
| }); | ||
|
|
||
| const handleTraceLine = (line: string) => | ||
| Effect.gen(function* () { | ||
| const trimmedLine = line.trim(); | ||
| if (trimmedLine.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| const traceRecord = decodeJsonResult(Trace2Record)(trimmedLine); | ||
| if (Result.isFailure(traceRecord)) { | ||
| yield* Effect.logDebug( | ||
| `GitCore.trace2: failed to parse trace line for ${quoteGitCommand(input.args)} in ${input.cwd}`, | ||
| traceRecord.failure, | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| if (traceRecord.success.child_class !== "hook") { | ||
| return; | ||
| } | ||
|
|
||
| const event = traceRecord.success.event; | ||
| const childKey = trace2ChildKey(traceRecord.success); | ||
| if (childKey === null) { | ||
| return; | ||
| } | ||
| const started = hookStartByChildKey.get(childKey); | ||
| const hookNameFromEvent = | ||
| typeof traceRecord.success.hook_name === "string" | ||
| ? traceRecord.success.hook_name.trim() | ||
| : ""; | ||
| const hookName = hookNameFromEvent.length > 0 ? hookNameFromEvent : (started?.hookName ?? ""); | ||
| if (hookName.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (event === "child_start") { | ||
| hookStartByChildKey.set(childKey, { hookName, startedAtMs: Date.now() }); | ||
| if (progress.onHookStarted) { | ||
| yield* progress.onHookStarted(hookName); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| if (event === "child_exit") { | ||
| hookStartByChildKey.delete(childKey); | ||
| if (progress.onHookFinished) { | ||
| const code = traceRecord.success.code; | ||
| yield* progress.onHookFinished({ | ||
| hookName: started?.hookName ?? hookName, | ||
| exitCode: typeof code === "number" && Number.isInteger(code) ? code : null, | ||
| durationMs: started ? Math.max(0, Date.now() - started.startedAtMs) : null, | ||
| }); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| const deltaMutex = yield* Semaphore.make(1); | ||
| const readTraceDelta = deltaMutex.withPermit( | ||
| fs.readFileString(traceFilePath).pipe( | ||
| Effect.flatMap((contents) => | ||
| Effect.uninterruptible( | ||
| Ref.modify(traceTailState, ({ processedChars, remainder }) => { | ||
| if (contents.length <= processedChars) { | ||
| return [[], { processedChars, remainder }]; | ||
| } | ||
|
|
||
| const appended = contents.slice(processedChars); | ||
| const combined = remainder + appended; | ||
| const lines = combined.split("\n"); | ||
| const nextRemainder = lines.pop() ?? ""; | ||
|
|
||
| return [ | ||
| lines.map((line) => line.replace(/\r$/, "")), | ||
| { | ||
| processedChars: contents.length, | ||
| remainder: nextRemainder, | ||
| }, | ||
| ]; | ||
| }).pipe( | ||
| Effect.flatMap((lines) => Effect.forEach(lines, handleTraceLine, { discard: true })), | ||
| ), | ||
| ), | ||
| ), | ||
| Effect.ignore({ log: true }), | ||
| ), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trace file fully re-read on every watch eventLow Severity
|
||
| ); | ||
| const traceFileName = path.basename(traceFilePath); | ||
| yield* Stream.runForEach(fs.watch(traceFilePath), (event) => { | ||
| const eventPath = event.path; | ||
| const isTargetTraceEvent = | ||
| eventPath === traceFilePath || | ||
| eventPath === traceFileName || | ||
| path.basename(eventPath) === traceFileName; | ||
| if (!isTargetTraceEvent) return Effect.void; | ||
| return readTraceDelta; | ||
| }).pipe(Effect.ignoreCause({ log: true }), Effect.forkScoped); | ||
|
|
||
| yield* Effect.addFinalizer(() => | ||
| Effect.gen(function* () { | ||
| yield* readTraceDelta; | ||
| const finalLine = yield* Ref.modify(traceTailState, ({ processedChars, remainder }) => [ | ||
| remainder.trim(), | ||
| { | ||
| processedChars, | ||
| remainder: "", | ||
| }, | ||
| ]); | ||
| if (finalLine.length > 0) { | ||
| yield* handleTraceLine(finalLine); | ||
| } | ||
| }), | ||
| ); | ||
macroscopeapp[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return { | ||
| env: { | ||
| GIT_TRACE2_EVENT: traceFilePath, | ||
| }, | ||
| flush: readTraceDelta, | ||
| }; | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }); | ||
|
|
||
| const collectOutput = Effect.fn(function* <E>( | ||
| input: Pick<ExecuteGitInput, "operation" | "cwd" | "args">, | ||
| stream: Stream.Stream<Uint8Array, E>, | ||
| maxOutputBytes: number, | ||
| onLine: ((line: string) => Effect.Effect<void, never>) | undefined, | ||
| ): Effect.fn.Return<string, GitCommandError> { | ||
| const decoder = new TextDecoder(); | ||
| let bytes = 0; | ||
| let text = ""; | ||
| let lineBuffer = ""; | ||
|
|
||
| const emitCompleteLines = (flush: boolean) => | ||
| Effect.gen(function* () { | ||
| let newlineIndex = lineBuffer.indexOf("\n"); | ||
| while (newlineIndex >= 0) { | ||
| const line = lineBuffer.slice(0, newlineIndex).replace(/\r$/, ""); | ||
| lineBuffer = lineBuffer.slice(newlineIndex + 1); | ||
| if (line.length > 0 && onLine) { | ||
| yield* onLine(line); | ||
| } | ||
| newlineIndex = lineBuffer.indexOf("\n"); | ||
| } | ||
|
|
||
| if (flush) { | ||
| const trailing = lineBuffer.replace(/\r$/, ""); | ||
| lineBuffer = ""; | ||
| if (trailing.length > 0 && onLine) { | ||
| yield* onLine(trailing); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| yield* Stream.runForEach(stream, (chunk) => | ||
| Effect.gen(function* () { | ||
|
|
@@ -277,11 +478,17 @@ const collectOutput = Effect.fn(function* <E>( | |
| detail: `${quoteGitCommand(input.args)} output exceeded ${maxOutputBytes} bytes and was truncated.`, | ||
| }); | ||
| } | ||
| text += decoder.decode(chunk, { stream: true }); | ||
| const decoded = decoder.decode(chunk, { stream: true }); | ||
| text += decoded; | ||
| lineBuffer += decoded; | ||
| yield* emitCompleteLines(false); | ||
| }), | ||
| ).pipe(Effect.mapError(toGitCommandError(input, "output stream failed."))); | ||
|
|
||
| text += decoder.decode(); | ||
| const remainder = decoder.decode(); | ||
| text += remainder; | ||
| lineBuffer += remainder; | ||
| yield* emitCompleteLines(true); | ||
| return text; | ||
| }); | ||
|
|
||
|
|
@@ -306,26 +513,46 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute" | |
| const maxOutputBytes = input.maxOutputBytes ?? DEFAULT_MAX_OUTPUT_BYTES; | ||
|
|
||
| const commandEffect = Effect.gen(function* () { | ||
| const trace2Monitor = yield* createTrace2Monitor(commandInput, input.progress).pipe( | ||
| Effect.provideService(Path.Path, path), | ||
| Effect.provideService(FileSystem.FileSystem, fileSystem), | ||
| Effect.mapError(toGitCommandError(commandInput, "failed to create trace2 monitor.")), | ||
| ); | ||
| const child = yield* commandSpawner | ||
| .spawn( | ||
| ChildProcess.make("git", commandInput.args, { | ||
| cwd: commandInput.cwd, | ||
| ...(input.env ? { env: input.env } : {}), | ||
| env: { | ||
| ...process.env, | ||
| ...input.env, | ||
| ...trace2Monitor.env, | ||
| }, | ||
| }), | ||
| ) | ||
| .pipe(Effect.mapError(toGitCommandError(commandInput, "failed to spawn."))); | ||
|
|
||
| const [stdout, stderr, exitCode] = yield* Effect.all( | ||
| [ | ||
| collectOutput(commandInput, child.stdout, maxOutputBytes), | ||
| collectOutput(commandInput, child.stderr, maxOutputBytes), | ||
| collectOutput( | ||
| commandInput, | ||
| child.stdout, | ||
| maxOutputBytes, | ||
| input.progress?.onStdoutLine, | ||
| ), | ||
| collectOutput( | ||
| commandInput, | ||
| child.stderr, | ||
| maxOutputBytes, | ||
| input.progress?.onStderrLine, | ||
| ), | ||
| child.exitCode.pipe( | ||
| Effect.map((value) => Number(value)), | ||
| Effect.mapError(toGitCommandError(commandInput, "failed to report exit code.")), | ||
| ), | ||
| ], | ||
| { concurrency: "unbounded" }, | ||
| ); | ||
| yield* trace2Monitor.flush; | ||
|
|
||
| if (!input.allowNonZeroExit && exitCode !== 0) { | ||
| const trimmedStderr = stderr.trim(); | ||
|
|
@@ -376,6 +603,7 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute" | |
| args, | ||
| allowNonZeroExit: true, | ||
| ...(options.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}), | ||
| ...(options.progress ? { progress: options.progress } : {}), | ||
| }).pipe( | ||
| Effect.flatMap((result) => { | ||
| if (options.allowNonZeroExit || result.code === 0) { | ||
|
|
@@ -947,14 +1175,37 @@ export const makeGitCore = (options?: { executeOverride?: GitCoreShape["execute" | |
| }; | ||
| }); | ||
|
|
||
| const commit: GitCoreShape["commit"] = (cwd, subject, body) => | ||
| const commit: GitCoreShape["commit"] = (cwd, subject, body, options?: GitCommitOptions) => | ||
| Effect.gen(function* () { | ||
| const args = ["commit", "-m", subject]; | ||
| const trimmedBody = body.trim(); | ||
| if (trimmedBody.length > 0) { | ||
| args.push("-m", trimmedBody); | ||
| } | ||
| yield* runGit("GitCore.commit.commit", cwd, args); | ||
| const progress = options?.progress | ||
| ? { | ||
| ...(options.progress.onOutputLine | ||
| ? { | ||
| onStdoutLine: (line: string) => | ||
| options.progress?.onOutputLine?.({ stream: "stdout", text: line }) ?? | ||
| Effect.void, | ||
| onStderrLine: (line: string) => | ||
| options.progress?.onOutputLine?.({ stream: "stderr", text: line }) ?? | ||
| Effect.void, | ||
| } | ||
| : {}), | ||
| ...(options.progress.onHookStarted | ||
| ? { onHookStarted: options.progress.onHookStarted } | ||
| : {}), | ||
| ...(options.progress.onHookFinished | ||
| ? { onHookFinished: options.progress.onHookFinished } | ||
| : {}), | ||
| } | ||
| : null; | ||
| yield* executeGit("GitCore.commit.commit", cwd, args, { | ||
| ...(options?.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}), | ||
| ...(progress ? { progress } : {}), | ||
| }).pipe(Effect.asVoid); | ||
| const commitSha = yield* runGitStdout("GitCore.commit.revParseHead", cwd, [ | ||
| "rev-parse", | ||
| "HEAD", | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.