Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions docs/CONCURRENCY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Concurrency

drift uses Zig 0.16's `std.Io` interface throughout. Every file read, subprocess spawn, and path resolution flows through an `Io` instance threaded from `std.process.Init` in `main` down through `CommandContext` to every command.

This design makes it straightforward to parallelize CPU- and I/O-bound work without changing the control flow of individual functions. Below is what's done today and what remains as viable follow-up.

## Implemented: per-doc parallelism (`Io.Group`)

`drift check` wraps the per-doc loop in `src/commands/lint.zig` in an `Io.Group`. Each doc's binding checks (file read → tree-sitter parse → hash → optional `git log` for blame) run as independent tasks on the thread pool backing `Io.Threaded`.

Key design constraints:

- **Task-local `CommandContext`.** Each task builds its own `std.heap.ArenaAllocator` (child of `run_arena`) so `ctx.scratch()` / `ctx.resetScratch()` inside `checkBinding`, `checkDocLinks`, and `classifyRelativeLink` continue to work unchanged.
- **Task-local `FileCache`.** `std.StringHashMap` is not thread-safe. Each task gets its own cache. Docs rarely share files across each other, so the lost hit rate is small and not worth a mutex.
- **Pre-allocated result slots.** `results: []?DocCheckResult` is allocated once from `run_arena`. Tasks write their own slot; the main thread merges in doc-order (docs are already sorted by `discoverDocGroups`) so output stays deterministic.
- **Error handling.** `Io.Group.async` tasks cannot propagate errors back to the caller. A `checkOneDoc` wrapper catches errors and stores them as `error_message: ?[]const u8` on the result. The main thread prints and translates to `error.LintCheckFailed` during merge.

Local measurement on the drift repo itself: ~0.40 s → ~0.14 s (≈3×).

## Proposed: speculative blame via `io.async`

When a binding turns stale, `checkBinding` shells out to `git log` for blame info (`vcs.getLatestBlameInfo`). Today this is serial inside each task. Starting the blame query speculatively *before* the fingerprint comparison finishes removes its latency from the stale path:

```zig
var blame_future = io.async(vcs.getLatestBlameInfo, .{ ... });
defer if (blame_future.cancel(io)) |_| {} else |_| {};

// ... compute fingerprint ...
if (is_fresh) return .{ .result = .fresh, ... }; // cancel fires on defer
const blame = try blame_future.await(io);
return .{ .result = .stale, .blame = blame, ... };
```

`io.async` is infallible on `Io.Threaded` (it runs inline on `Io.failing`), so the code reads the same on both backends. Cost: wasted work on fresh anchors. A `git log -1` against a single path is cheap; the tradeoff favours the stale path since that's what blocks the user.

## Proposed: `Io.Batch` for link-existence checks

`checkDocLinks` in `src/commands/lint.zig` resolves each markdown link in a doc one at a time and calls `pathExists` (`accessAbsolute`) on each target. For docs with dense cross-links this is many sequential stat syscalls.

`Io.Batch` is the low-level batching primitive (one layer below `Io.Group`) and supports stat-style operations:

```zig
var batch: Io.Batch = .init;
for (parsed.links.items) |link| batch.stat(io, absolute);
try batch.await(io);
```

On Linux with `Io.Uring` this becomes a single submission; on `Io.Threaded` it falls back to parallel thread-pool stats. Modest win on dense docs, larger win under `Io.Uring` once that backend stabilizes.

## Proposed: overlap startup shell-outs

At the top of `lint.run`, `discoverDocGroups` calls `git ls-files` and `getRepoIdentity` calls `git remote get-url origin` back-to-back. Both take separate allocators and are trivially independent:

```zig
var identity_future = io.async(vcs.getRepoIdentity, .{ ctx.run_arena, ctx.scratch(), cwd_path });
var doc_groups = try discoverDocGroups(...);
const repo_identity = identity_future.await(io) catch null;
```

Saves a few ms per run. Small absolute win, but free — both calls are already `io`-aware.

## Proposed: rework `GitCatFile` with `Io.Group`

`GitCatFile` (`src/vcs.zig`) keeps a persistent `git cat-file --batch` process alive to avoid spawn overhead for historical file reads. Today request/response is synchronous. Running request submission and response parsing as two concurrent tasks inside an `Io.Group` would let the next request go out while the current response is still being read — useful only if we shift to a use case that issues many queries, which today we don't.

Noting it as a shape that 0.16 now permits; not a priority.

## What not to parallelize

- **Writes to `stdout_w` / `stderr_w`.** The writers are single-threaded and the output order matters. All user-facing output happens on the main thread, after `group.await`.
- **`stderr` from inside tasks.** Errors are stored on the result and printed centrally — do not call `stderr_w.print(...)` from a task.
- **VCS spawn storms.** `Io.Threaded` bounds the thread pool, so we're unlikely to overload `git` — but if future work fans out to hundreds of concurrent `git log` calls, revisit this.

## Backends

All of the above runs unchanged on any `Io` implementation. Today drift uses `Io.Threaded` (the only feature-complete backend in 0.16). `Io.Uring` (Linux), `Io.Kqueue` (BSD/macOS), and `Io.Dispatch` (macOS) are proof-of-concept in 0.16 and will become interesting for `Io.Batch` work once they stabilize.
2 changes: 1 addition & 1 deletion drift.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
CLAUDE.md -> build.zig sig:7194b38f39dbadba
CLAUDE.md -> src/main.zig sig:f2735440986d2477
docs/CLI.md -> src/commands/link.zig sig:7bd7f824afc30e0b
docs/CLI.md -> src/commands/lint.zig sig:1fd2cb4096c65c64
docs/CLI.md -> src/commands/lint.zig sig:e29be45c4d84759e
docs/CLI.md -> src/commands/refs.zig sig:f623b7774086094e
docs/CLI.md -> src/commands/status.zig sig:eade166d24a20b81
docs/CLI.md -> src/commands/unlink.zig sig:0dbe1ee3315211b5
Expand Down
237 changes: 170 additions & 67 deletions src/commands/lint.zig
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ const DocCheckResult = struct {
result: DocResult,
anchors: std.ArrayList(JsonAnchorRow),
links: std.ArrayList(JsonLinkRow),
/// Populated when `checkBinding` fails for a binding in this doc. The main
/// thread re-raises this as `error.LintCheckFailed` during the merge step,
/// since `Io.Group.async` cannot propagate errors out of tasks.
error_message: ?[]const u8 = null,
};

const CheckResult = struct {
Expand Down Expand Up @@ -178,10 +182,6 @@ pub fn run(
const detected_vcs = vcs.detectVcs();
const repo_identity = vcs.getRepoIdentity(ctx.io, ctx.run_arena, ctx.scratch(), cwd_path);

var file_cache: FileCache = undefined;
file_cache.init(ctx.io, ctx.run_arena);
defer file_cache.deinit();

const normalized_changed = if (changed_path) |raw|
try normalizeChangedPrefix(ctx, lf.root_path, cwd_path, raw)
else
Expand All @@ -204,92 +204,195 @@ pub fn run(
.links_broken = 0,
};

var checked_any = false;
for (doc_groups.items) |doc| {
// Pre-allocate one result slot per doc so each task writes into disjoint
// memory. Tasks produce results independently; we merge in doc-order on
// the main thread below to preserve deterministic output.
const results = try ctx.run_arena.alloc(?DocCheckResult, doc_groups.items.len);
@memset(results, null);

var group: std.Io.Group = .init;
defer group.cancel(ctx.io);

for (doc_groups.items, 0..) |doc, i| {
if (normalized_changed) |prefix| {
if (!docMatchesChangedPath(doc, prefix)) continue;
}
group.async(ctx.io, checkOneDoc, .{
ctx.io,
ctx.run_arena,
lf.root_path,
doc,
detected_vcs,
repo_identity,
&results[i],
});
}
try group.await(ctx.io);

var checked_any = false;
for (results) |maybe| {
const doc_result = maybe orelse continue;
checked_any = true;
if (doc_result.error_message) |msg| {
stderr_w.print("{s}", .{msg}) catch {};
return error.LintCheckFailed;
}
mergeDocResult(ctx.run_arena, &result, doc_result) catch |err| return err;
}

var doc_result = DocCheckResult{
switch (format) {
.text => try writeResultsText(stdout_w, &result, checked_any),
.json => try writeResultsJson(ctx.run_arena, stdout_w, &result),
}

return if (result.failed) .fail else .pass;
}

/// Per-doc check task, spawned once per `DocGroup` inside `Io.Group`.
///
/// Creates a task-local scratch arena and task-local `FileCache` so tasks do
/// not contend on shared state. The `run_arena` allocator is threadsafe (see
/// `std.heap.ArenaAllocator`), so persistent allocations (anchor rows, link
/// rows, strings) go there. Errors from `checkBinding` are stored on `out`
/// (via `error_message`) and re-raised by the main thread during merge —
/// `Io.Group.async` cannot propagate errors back out of a task.
fn checkOneDoc(
io: std.Io,
run_arena: std.mem.Allocator,
root_path: []const u8,
doc: DocGroup,
detected_vcs: vcs.VcsKind,
repo_identity: ?[]const u8,
out: *?DocCheckResult,
) void {
checkOneDocInner(io, run_arena, root_path, doc, detected_vcs, repo_identity, out) catch |err| {
// Persist the failure into the result slot so the main thread can
// surface it as `error.LintCheckFailed` after `group.await`.
const msg = std.fmt.allocPrint(run_arena, "error checking doc {s}: {s}\n", .{ doc.path, @errorName(err) }) catch "error checking doc (out of memory)\n";
out.* = DocCheckResult{
.path = doc.path,
.origin = commonOrigin(doc.bindings.items),
.result = .fresh,
.origin = null,
.result = .broken,
.anchors = .empty,
.links = .empty,
.error_message = msg,
};
};
}

var fresh_count: usize = 0;
var stale_count: usize = 0;
var skip_count: usize = 0;
fn checkOneDocInner(
io: std.Io,
run_arena: std.mem.Allocator,
root_path: []const u8,
doc: DocGroup,
detected_vcs: vcs.VcsKind,
repo_identity: ?[]const u8,
out: *?DocCheckResult,
) !void {
var task_scratch = std.heap.ArenaAllocator.init(run_arena);
defer task_scratch.deinit();

for (doc.bindings.items) |binding| {
ctx.resetScratch();
const parsed = target.parse(binding.target);
const origin = binding.fieldValue("origin");
const task_ctx = CommandContext{
.io = io,
.run_arena = run_arena,
.scratch_arena = &task_scratch,
};

const outcome = blk: {
if (origin) |o| {
const is_local = if (repo_identity) |ri| std.mem.eql(u8, o, ri) else false;
if (!is_local) break :blk AnchorOutcome{ .result = .skip, .reason_code = .origin_mismatch };
}
break :blk checkBinding(ctx, lf.root_path, binding, parsed, &file_cache, detected_vcs) catch |err| {
stderr_w.print("error checking {s}: {s}\n", .{ binding.target, @errorName(err) }) catch {};
return error.LintCheckFailed;
};
};
var file_cache: FileCache = undefined;
file_cache.init(io, run_arena);
defer file_cache.deinit();

var doc_result = DocCheckResult{
.path = doc.path,
.origin = commonOrigin(doc.bindings.items),
.result = .fresh,
.anchors = .empty,
.links = .empty,
};

try doc_result.anchors.append(ctx.run_arena, jsonAnchorFromOutcome(binding.target, binding.fieldValue("sig"), parsed, outcome));
switch (outcome.result) {
.fresh => fresh_count += 1,
.stale => stale_count += 1,
.skip => skip_count += 1,
var fresh_count: usize = 0;
var stale_count: usize = 0;
var skip_count: usize = 0;

for (doc.bindings.items) |binding| {
task_ctx.resetScratch();
const parsed = target.parse(binding.target);
const origin = binding.fieldValue("origin");

const outcome = blk: {
if (origin) |o| {
const is_local = if (repo_identity) |ri| std.mem.eql(u8, o, ri) else false;
if (!is_local) break :blk AnchorOutcome{ .result = .skip, .reason_code = .origin_mismatch };
}
break :blk checkBinding(task_ctx, root_path, binding, parsed, &file_cache, detected_vcs) catch |err| {
const msg = try std.fmt.allocPrint(run_arena, "error checking {s}: {s}\n", .{ binding.target, @errorName(err) });
doc_result.error_message = msg;
out.* = doc_result;
return;
};
};

try doc_result.anchors.append(run_arena, jsonAnchorFromOutcome(binding.target, binding.fieldValue("sig"), parsed, outcome));
switch (outcome.result) {
.fresh => fresh_count += 1,
.stale => stale_count += 1,
.skip => skip_count += 1,
}
}

try checkDocLinks(ctx, lf.root_path, doc.path, &file_cache, &doc_result.links);
try checkDocLinks(task_ctx, root_path, doc.path, &file_cache, &doc_result.links);

var broken_links: usize = 0;
for (doc_result.links.items) |link| {
if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1;
}
var broken_links: usize = 0;
for (doc_result.links.items) |link| {
if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1;
}

doc_result.result = if (broken_links > 0)
.broken
else if (stale_count > 0)
.stale
else if (fresh_count == 0 and skip_count > 0)
.skip
else
.fresh;

result.docs_total += 1;
result.anchors_total += @intCast(doc.bindings.items.len);
result.anchors_fresh += @intCast(fresh_count);
result.anchors_stale += @intCast(stale_count);
result.anchors_skipped += @intCast(skip_count);
result.links_total += @intCast(doc_result.links.items.len);
result.links_broken += @intCast(broken_links);

switch (doc_result.result) {
.fresh => result.docs_fresh += 1,
.skip => result.docs_skipped += 1,
.stale, .broken => {
result.docs_stale += 1;
result.failed = true;
},
}
if (broken_links > 0) result.failed = true;
doc_result.result = if (broken_links > 0)
.broken
else if (stale_count > 0)
.stale
else if (fresh_count == 0 and skip_count > 0)
.skip
else
.fresh;

out.* = doc_result;
}

try result.docs.append(ctx.run_arena, doc_result);
fn mergeDocResult(run_arena: std.mem.Allocator, result: *CheckResult, doc_result: DocCheckResult) !void {
var broken_links: usize = 0;
for (doc_result.links.items) |link| {
if (std.mem.eql(u8, link.wire.result, "broken")) broken_links += 1;
}
var fresh_anchors: usize = 0;
var stale_anchors: usize = 0;
var skip_anchors: usize = 0;
for (doc_result.anchors.items) |row| {
const r = row.wire.result;
if (std.mem.eql(u8, r, "fresh")) fresh_anchors += 1 //
else if (std.mem.eql(u8, r, "stale")) stale_anchors += 1 //
else if (std.mem.eql(u8, r, "skip")) skip_anchors += 1;
}

switch (format) {
.text => try writeResultsText(stdout_w, &result, checked_any),
.json => try writeResultsJson(ctx.run_arena, stdout_w, &result),
result.docs_total += 1;
result.anchors_total += @intCast(doc_result.anchors.items.len);
result.anchors_fresh += @intCast(fresh_anchors);
result.anchors_stale += @intCast(stale_anchors);
result.anchors_skipped += @intCast(skip_anchors);
result.links_total += @intCast(doc_result.links.items.len);
result.links_broken += @intCast(broken_links);

switch (doc_result.result) {
.fresh => result.docs_fresh += 1,
.skip => result.docs_skipped += 1,
.stale, .broken => {
result.docs_stale += 1;
result.failed = true;
},
}
if (broken_links > 0) result.failed = true;

return if (result.failed) .fail else .pass;
try result.docs.append(run_arena, doc_result);
}

fn discoverDocGroups(
Expand Down
Loading