diff --git a/docs/CONCURRENCY.md b/docs/CONCURRENCY.md new file mode 100644 index 0000000..59aa7a4 --- /dev/null +++ b/docs/CONCURRENCY.md @@ -0,0 +1,76 @@ +# Concurrency + +drift uses Zig 0.16's `std.Io` interface throughout. Every file read, subprocess spawn, and path resolution flows through an `Io` instance threaded from `std.process.Init` in `main` down through `CommandContext` to every command. + +This design makes it straightforward to parallelize CPU- and I/O-bound work without changing the control flow of individual functions. Below is what's done today and what remains as viable follow-up. + +## Implemented: per-doc parallelism (`Io.Group`) + +`drift check` wraps the per-doc loop in `src/commands/lint.zig` in an `Io.Group`. Each doc's binding checks (file read → tree-sitter parse → hash → optional `git log` for blame) run as independent tasks on the thread pool backing `Io.Threaded`. + +Key design constraints: + +- **Task-local `CommandContext`.** Each task builds its own `std.heap.ArenaAllocator` (child of `run_arena`) so `ctx.scratch()` / `ctx.resetScratch()` inside `checkBinding`, `checkDocLinks`, and `classifyRelativeLink` continue to work unchanged. +- **Task-local `FileCache`.** `std.StringHashMap` is not thread-safe. Each task gets its own cache. Docs rarely share files across each other, so the lost hit rate is small and not worth a mutex. +- **Pre-allocated result slots.** `results: []?DocCheckResult` is allocated once from `run_arena`. Tasks write their own slot; the main thread merges in doc-order (docs are already sorted by `discoverDocGroups`) so output stays deterministic. +- **Error handling.** `Io.Group.async` tasks cannot propagate errors back to the caller. A `checkOneDoc` wrapper catches errors and stores them as `error_message: ?[]const u8` on the result. The main thread prints and translates to `error.LintCheckFailed` during merge. + +Local measurement on the drift repo itself: ~0.40 s → ~0.14 s (≈3×). + +## Proposed: speculative blame via `io.async` + +When a binding turns stale, `checkBinding` shells out to `git log` for blame info (`vcs.getLatestBlameInfo`). Today this is serial inside each task. Starting the blame query speculatively *before* the fingerprint comparison finishes removes its latency from the stale path: + +```zig +var blame_future = io.async(vcs.getLatestBlameInfo, .{ ... }); +defer if (blame_future.cancel(io)) |_| {} else |_| {}; + +// ... compute fingerprint ... +if (is_fresh) return .{ .result = .fresh, ... }; // cancel fires on defer +const blame = try blame_future.await(io); +return .{ .result = .stale, .blame = blame, ... }; +``` + +`io.async` is infallible on `Io.Threaded` (it runs inline on `Io.failing`), so the code reads the same on both backends. Cost: wasted work on fresh anchors. A `git log -1` against a single path is cheap; the tradeoff favours the stale path since that's what blocks the user. + +## Proposed: `Io.Batch` for link-existence checks + +`checkDocLinks` in `src/commands/lint.zig` resolves each markdown link in a doc one at a time and calls `pathExists` (`accessAbsolute`) on each target. For docs with dense cross-links this is many sequential stat syscalls. + +`Io.Batch` is the low-level batching primitive (one layer below `Io.Group`) and supports stat-style operations: + +```zig +var batch: Io.Batch = .init; +for (parsed.links.items) |link| batch.stat(io, absolute); +try batch.await(io); +``` + +On Linux with `Io.Uring` this becomes a single submission; on `Io.Threaded` it falls back to parallel thread-pool stats. Modest win on dense docs, larger win under `Io.Uring` once that backend stabilizes. + +## Proposed: overlap startup shell-outs + +At the top of `lint.run`, `discoverDocGroups` calls `git ls-files` and `getRepoIdentity` calls `git remote get-url origin` back-to-back. Both take separate allocators and are trivially independent: + +```zig +var identity_future = io.async(vcs.getRepoIdentity, .{ ctx.run_arena, ctx.scratch(), cwd_path }); +var doc_groups = try discoverDocGroups(...); +const repo_identity = identity_future.await(io) catch null; +``` + +Saves a few ms per run. Small absolute win, but free — both calls are already `io`-aware. + +## Proposed: rework `GitCatFile` with `Io.Group` + +`GitCatFile` (`src/vcs.zig`) keeps a persistent `git cat-file --batch` process alive to avoid spawn overhead for historical file reads. Today request/response is synchronous. Running request submission and response parsing as two concurrent tasks inside an `Io.Group` would let the next request go out while the current response is still being read — useful only if we shift to a use case that issues many queries, which today we don't. + +Noting it as a shape that 0.16 now permits; not a priority. + +## What not to parallelize + +- **Writes to `stdout_w` / `stderr_w`.** The writers are single-threaded and the output order matters. All user-facing output happens on the main thread, after `group.await`. +- **`stderr` from inside tasks.** Errors are stored on the result and printed centrally — do not call `stderr_w.print(...)` from a task. +- **VCS spawn storms.** `Io.Threaded` bounds the thread pool, so we're unlikely to overload `git` — but if future work fans out to hundreds of concurrent `git log` calls, revisit this. + +## Backends + +All of the above runs unchanged on any `Io` implementation. Today drift uses `Io.Threaded` (the only feature-complete backend in 0.16). `Io.Uring` (Linux), `Io.Kqueue` (BSD/macOS), and `Io.Dispatch` (macOS) are proof-of-concept in 0.16 and will become interesting for `Io.Batch` work once they stabilize. diff --git a/drift.lock b/drift.lock index 0a71dae..fb7fbd4 100644 --- a/drift.lock +++ b/drift.lock @@ -3,7 +3,7 @@ CLAUDE.md -> build.zig sig:7194b38f39dbadba CLAUDE.md -> src/main.zig sig:f2735440986d2477 docs/CLI.md -> src/commands/link.zig sig:7bd7f824afc30e0b -docs/CLI.md -> src/commands/lint.zig sig:1fd2cb4096c65c64 +docs/CLI.md -> src/commands/lint.zig sig:e29be45c4d84759e docs/CLI.md -> src/commands/refs.zig sig:f623b7774086094e docs/CLI.md -> src/commands/status.zig sig:eade166d24a20b81 docs/CLI.md -> src/commands/unlink.zig sig:0dbe1ee3315211b5 diff --git a/src/commands/lint.zig b/src/commands/lint.zig index eb9c6b5..dbddd7d 100644 --- a/src/commands/lint.zig +++ b/src/commands/lint.zig @@ -116,6 +116,10 @@ const DocCheckResult = struct { result: DocResult, anchors: std.ArrayList(JsonAnchorRow), links: std.ArrayList(JsonLinkRow), + /// Populated when `checkBinding` fails for a binding in this doc. The main + /// thread re-raises this as `error.LintCheckFailed` during the merge step, + /// since `Io.Group.async` cannot propagate errors out of tasks. + error_message: ?[]const u8 = null, }; const CheckResult = struct { @@ -178,10 +182,6 @@ pub fn run( const detected_vcs = vcs.detectVcs(); const repo_identity = vcs.getRepoIdentity(ctx.io, ctx.run_arena, ctx.scratch(), cwd_path); - var file_cache: FileCache = undefined; - file_cache.init(ctx.io, ctx.run_arena); - defer file_cache.deinit(); - const normalized_changed = if (changed_path) |raw| try normalizeChangedPrefix(ctx, lf.root_path, cwd_path, raw) else @@ -204,92 +204,195 @@ pub fn run( .links_broken = 0, }; - var checked_any = false; - for (doc_groups.items) |doc| { + // Pre-allocate one result slot per doc so each task writes into disjoint + // memory. Tasks produce results independently; we merge in doc-order on + // the main thread below to preserve deterministic output. + const results = try ctx.run_arena.alloc(?DocCheckResult, doc_groups.items.len); + @memset(results, null); + + var group: std.Io.Group = .init; + defer group.cancel(ctx.io); + + for (doc_groups.items, 0..) |doc, i| { if (normalized_changed) |prefix| { if (!docMatchesChangedPath(doc, prefix)) continue; } + group.async(ctx.io, checkOneDoc, .{ + ctx.io, + ctx.run_arena, + lf.root_path, + doc, + detected_vcs, + repo_identity, + &results[i], + }); + } + try group.await(ctx.io); + + var checked_any = false; + for (results) |maybe| { + const doc_result = maybe orelse continue; checked_any = true; + if (doc_result.error_message) |msg| { + stderr_w.print("{s}", .{msg}) catch {}; + return error.LintCheckFailed; + } + mergeDocResult(ctx.run_arena, &result, doc_result) catch |err| return err; + } - var doc_result = DocCheckResult{ + switch (format) { + .text => try writeResultsText(stdout_w, &result, checked_any), + .json => try writeResultsJson(ctx.run_arena, stdout_w, &result), + } + + return if (result.failed) .fail else .pass; +} + +/// Per-doc check task, spawned once per `DocGroup` inside `Io.Group`. +/// +/// Creates a task-local scratch arena and task-local `FileCache` so tasks do +/// not contend on shared state. The `run_arena` allocator is threadsafe (see +/// `std.heap.ArenaAllocator`), so persistent allocations (anchor rows, link +/// rows, strings) go there. Errors from `checkBinding` are stored on `out` +/// (via `error_message`) and re-raised by the main thread during merge — +/// `Io.Group.async` cannot propagate errors back out of a task. +fn checkOneDoc( + io: std.Io, + run_arena: std.mem.Allocator, + root_path: []const u8, + doc: DocGroup, + detected_vcs: vcs.VcsKind, + repo_identity: ?[]const u8, + out: *?DocCheckResult, +) void { + checkOneDocInner(io, run_arena, root_path, doc, detected_vcs, repo_identity, out) catch |err| { + // Persist the failure into the result slot so the main thread can + // surface it as `error.LintCheckFailed` after `group.await`. + const msg = std.fmt.allocPrint(run_arena, "error checking doc {s}: {s}\n", .{ doc.path, @errorName(err) }) catch "error checking doc (out of memory)\n"; + out.* = DocCheckResult{ .path = doc.path, - .origin = commonOrigin(doc.bindings.items), - .result = .fresh, + .origin = null, + .result = .broken, .anchors = .empty, .links = .empty, + .error_message = msg, }; + }; +} - var fresh_count: usize = 0; - var stale_count: usize = 0; - var skip_count: usize = 0; +fn checkOneDocInner( + io: std.Io, + run_arena: std.mem.Allocator, + root_path: []const u8, + doc: DocGroup, + detected_vcs: vcs.VcsKind, + repo_identity: ?[]const u8, + out: *?DocCheckResult, +) !void { + var task_scratch = std.heap.ArenaAllocator.init(run_arena); + defer task_scratch.deinit(); - for (doc.bindings.items) |binding| { - ctx.resetScratch(); - const parsed = target.parse(binding.target); - const origin = binding.fieldValue("origin"); + const task_ctx = CommandContext{ + .io = io, + .run_arena = run_arena, + .scratch_arena = &task_scratch, + }; - const outcome = blk: { - if (origin) |o| { - const is_local = if (repo_identity) |ri| std.mem.eql(u8, o, ri) else false; - if (!is_local) break :blk AnchorOutcome{ .result = .skip, .reason_code = .origin_mismatch }; - } - break :blk checkBinding(ctx, lf.root_path, binding, parsed, &file_cache, detected_vcs) catch |err| { - stderr_w.print("error checking {s}: {s}\n", .{ binding.target, @errorName(err) }) catch {}; - return error.LintCheckFailed; - }; - }; + var file_cache: FileCache = undefined; + file_cache.init(io, run_arena); + defer file_cache.deinit(); + + var doc_result = DocCheckResult{ + .path = doc.path, + .origin = commonOrigin(doc.bindings.items), + .result = .fresh, + .anchors = .empty, + .links = .empty, + }; - try doc_result.anchors.append(ctx.run_arena, jsonAnchorFromOutcome(binding.target, binding.fieldValue("sig"), parsed, outcome)); - switch (outcome.result) { - .fresh => fresh_count += 1, - .stale => stale_count += 1, - .skip => skip_count += 1, + var fresh_count: usize = 0; + var stale_count: usize = 0; + var skip_count: usize = 0; + + for (doc.bindings.items) |binding| { + task_ctx.resetScratch(); + const parsed = target.parse(binding.target); + const origin = binding.fieldValue("origin"); + + const outcome = blk: { + if (origin) |o| { + const is_local = if (repo_identity) |ri| std.mem.eql(u8, o, ri) else false; + if (!is_local) break :blk AnchorOutcome{ .result = .skip, .reason_code = .origin_mismatch }; } + break :blk checkBinding(task_ctx, root_path, binding, parsed, &file_cache, detected_vcs) catch |err| { + const msg = try std.fmt.allocPrint(run_arena, "error checking {s}: {s}\n", .{ binding.target, @errorName(err) }); + doc_result.error_message = msg; + out.* = doc_result; + return; + }; + }; + + try doc_result.anchors.append(run_arena, jsonAnchorFromOutcome(binding.target, binding.fieldValue("sig"), parsed, outcome)); + switch (outcome.result) { + .fresh => fresh_count += 1, + .stale => stale_count += 1, + .skip => skip_count += 1, } + } - try checkDocLinks(ctx, lf.root_path, doc.path, &file_cache, &doc_result.links); + try checkDocLinks(task_ctx, root_path, doc.path, &file_cache, &doc_result.links); - var broken_links: usize = 0; - for (doc_result.links.items) |link| { - if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1; - } + var broken_links: usize = 0; + for (doc_result.links.items) |link| { + if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1; + } - doc_result.result = if (broken_links > 0) - .broken - else if (stale_count > 0) - .stale - else if (fresh_count == 0 and skip_count > 0) - .skip - else - .fresh; - - result.docs_total += 1; - result.anchors_total += @intCast(doc.bindings.items.len); - result.anchors_fresh += @intCast(fresh_count); - result.anchors_stale += @intCast(stale_count); - result.anchors_skipped += @intCast(skip_count); - result.links_total += @intCast(doc_result.links.items.len); - result.links_broken += @intCast(broken_links); - - switch (doc_result.result) { - .fresh => result.docs_fresh += 1, - .skip => result.docs_skipped += 1, - .stale, .broken => { - result.docs_stale += 1; - result.failed = true; - }, - } - if (broken_links > 0) result.failed = true; + doc_result.result = if (broken_links > 0) + .broken + else if (stale_count > 0) + .stale + else if (fresh_count == 0 and skip_count > 0) + .skip + else + .fresh; + + out.* = doc_result; +} - try result.docs.append(ctx.run_arena, doc_result); +fn mergeDocResult(run_arena: std.mem.Allocator, result: *CheckResult, doc_result: DocCheckResult) !void { + var broken_links: usize = 0; + for (doc_result.links.items) |link| { + if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1; + } + var fresh_anchors: usize = 0; + var stale_anchors: usize = 0; + var skip_anchors: usize = 0; + for (doc_result.anchors.items) |row| { + const r = row.wire.result; + if (std.mem.eql(u8, r, "fresh")) fresh_anchors += 1 // + else if (std.mem.eql(u8, r, "stale")) stale_anchors += 1 // + else if (std.mem.eql(u8, r, "skip")) skip_anchors += 1; } - switch (format) { - .text => try writeResultsText(stdout_w, &result, checked_any), - .json => try writeResultsJson(ctx.run_arena, stdout_w, &result), + result.docs_total += 1; + result.anchors_total += @intCast(doc_result.anchors.items.len); + result.anchors_fresh += @intCast(fresh_anchors); + result.anchors_stale += @intCast(stale_anchors); + result.anchors_skipped += @intCast(skip_anchors); + result.links_total += @intCast(doc_result.links.items.len); + result.links_broken += @intCast(broken_links); + + switch (doc_result.result) { + .fresh => result.docs_fresh += 1, + .skip => result.docs_skipped += 1, + .stale, .broken => { + result.docs_stale += 1; + result.failed = true; + }, } + if (broken_links > 0) result.failed = true; - return if (result.failed) .fail else .pass; + try result.docs.append(run_arena, doc_result); } fn discoverDocGroups(