Skip to content

Commit c1c0acf

Browse files
committed
Fix tracker synchronization and dispatch execution
1 parent e4fcc05 commit c1c0acf

2 files changed

Lines changed: 102 additions & 26 deletions

File tree

src/main.zig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ pub fn main() !void {
297297
allocator,
298298
tracker_cfg,
299299
workflows,
300+
&store,
300301
&shutdown_requested,
301302
);
302303

src/tracker.zig

Lines changed: 101 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ const log = std.log.scoped(.tracker);
1414
const config = @import("config.zig");
1515
const types = @import("types.zig");
1616
const ids = @import("ids.zig");
17+
const dispatch_mod = @import("dispatch.zig");
1718
const subprocess_mod = @import("subprocess.zig");
19+
const Store = @import("store.zig").Store;
1820
const workspace_mod = @import("workspace.zig");
1921
const workflow_loader = @import("workflow_loader.zig");
2022
const tracker_client = @import("tracker_client.zig");
@@ -167,6 +169,7 @@ pub const Tracker = struct {
167169
cfg: config.TrackerConfig,
168170
state: TrackerState,
169171
workflows: workflow_loader.WorkflowMap,
172+
store: ?*Store,
170173
shutdown: *std.atomic.Value(bool),
171174
last_heartbeat_ms: i64,
172175
used_ports: std.AutoArrayHashMapUnmanaged(u16, void),
@@ -175,13 +178,15 @@ pub const Tracker = struct {
175178
allocator: std.mem.Allocator,
176179
cfg: config.TrackerConfig,
177180
workflows: workflow_loader.WorkflowMap,
181+
store: ?*Store,
178182
shutdown: *std.atomic.Value(bool),
179183
) Tracker {
180184
return .{
181185
.allocator = allocator,
182186
.cfg = cfg,
183187
.state = TrackerState.init(),
184188
.workflows = workflows,
189+
.store = store,
185190
.shutdown = shutdown,
186191
.last_heartbeat_ms = 0,
187192
.used_ports = .{},
@@ -256,6 +261,9 @@ pub const Tracker = struct {
256261
defer arena.deinit();
257262
const tick_alloc = arena.allocator();
258263

264+
self.state.mutex.lock();
265+
defer self.state.mutex.unlock();
266+
259267
self.heartbeatAll(tick_alloc);
260268
self.detectStalls(tick_alloc);
261269
self.driveRunningTasks(tick_alloc);
@@ -295,14 +303,12 @@ pub const Tracker = struct {
295303
}
296304

297305
// Remove failed heartbeat tasks and free owned strings
298-
self.state.mutex.lock();
299306
for (to_remove.items) |key| {
300307
if (self.state.running.fetchSwapRemove(key)) |entry| {
301308
self.freeRunningTaskStrings(entry.value);
302309
self.allocator.free(entry.key);
303310
}
304311
}
305-
self.state.mutex.unlock();
306312
}
307313

308314
/// Detect stalled subprocesses. If stalled, report failure to NullTickets,
@@ -328,22 +334,18 @@ pub const Tracker = struct {
328334
subprocess_mod.killSubprocess(child);
329335
}
330336

331-
self.state.mutex.lock();
332337
self.state.failed_count += 1;
333-
self.state.mutex.unlock();
334338
to_remove.append(tick_alloc, key) catch continue;
335339
}
336340
}
337341
}
338342

339-
self.state.mutex.lock();
340343
for (to_remove.items) |key| {
341344
if (self.state.running.fetchSwapRemove(key)) |entry| {
342345
self.freeRunningTaskStrings(entry.value);
343346
self.allocator.free(entry.key);
344347
}
345348
}
346-
self.state.mutex.unlock();
347349
}
348350

349351
/// Poll NullTickets for each workflow's claim_roles and claim available tasks.
@@ -484,8 +486,6 @@ pub const Tracker = struct {
484486
.state = .workspace_setup,
485487
};
486488

487-
self.state.mutex.lock();
488-
defer self.state.mutex.unlock();
489489
try self.state.running.put(self.allocator, key, running_task);
490490

491491
log.info("started task {s} (pipeline={s}, role={s}, mode={s})", .{
@@ -522,11 +522,9 @@ pub const Tracker = struct {
522522
var task_ids: std.ArrayListUnmanaged([]const u8) = .empty;
523523
defer task_ids.deinit(tick_alloc);
524524

525-
self.state.mutex.lock();
526525
for (self.state.running.keys()) |key| {
527526
task_ids.append(tick_alloc, key) catch continue;
528527
}
529-
self.state.mutex.unlock();
530528

531529
for (task_ids.items) |task_id| {
532530
const task = self.state.running.getPtr(task_id) orelse continue;
@@ -551,14 +549,12 @@ pub const Tracker = struct {
551549
}
552550
}
553551

554-
self.state.mutex.lock();
555552
for (to_remove.items) |key| {
556553
if (self.state.running.fetchSwapRemove(key)) |entry| {
557554
self.freeRunningTaskStrings(entry.value);
558555
self.allocator.free(entry.key);
559556
}
560557
}
561-
self.state.mutex.unlock();
562558
}
563559

564560
/// Reconcile running tasks with NullTickets state.
@@ -583,9 +579,7 @@ pub const Tracker = struct {
583579
}
584580
self.releasePort(sub.port);
585581
}
586-
self.state.mutex.lock();
587582
self.state.failed_count += 1;
588-
self.state.mutex.unlock();
589583
to_remove.append(tick_alloc, key) catch continue;
590584
continue;
591585
}
@@ -610,9 +604,7 @@ pub const Tracker = struct {
610604
}
611605
self.releasePort(sub.port);
612606
}
613-
self.state.mutex.lock();
614607
self.state.failed_count += 1;
615-
self.state.mutex.unlock();
616608
to_remove.append(tick_alloc, key) catch continue;
617609
continue;
618610
}
@@ -632,14 +624,12 @@ pub const Tracker = struct {
632624
task.task_version = info.task_version;
633625
}
634626

635-
self.state.mutex.lock();
636627
for (to_remove.items) |key| {
637628
if (self.state.running.fetchSwapRemove(key)) |entry| {
638629
self.freeRunningTaskStrings(entry.value);
639630
self.allocator.free(entry.key);
640631
}
641632
}
642-
self.state.mutex.unlock();
643633
}
644634

645635
/// workspace_setup → spawning (subprocess) or running (dispatch).
@@ -853,9 +843,96 @@ pub const Tracker = struct {
853843
return;
854844
};
855845

856-
// TODO: Wire actual dispatch.zig call using workflow.dispatch.worker_tags and protocol
857-
log.warn("dispatch execution not yet implemented for task {s} ({d} bytes rendered), failing", .{ task.task_id, rendered.len });
858-
task.state = .failed;
846+
const store = self.store orelse {
847+
log.err("dispatch requested for task {s}, but tracker store is unavailable", .{task.task_id});
848+
task.state = .failed;
849+
return;
850+
};
851+
852+
const workers = store.listWorkers(tick_alloc) catch |err| {
853+
log.err("failed to list workers for dispatch task {s}: {}", .{ task.task_id, err });
854+
task.state = .failed;
855+
return;
856+
};
857+
858+
var worker_infos: std.ArrayListUnmanaged(dispatch_mod.WorkerInfo) = .empty;
859+
defer worker_infos.deinit(tick_alloc);
860+
861+
for (workers) |worker| {
862+
if (workflow.dispatch.protocol.len > 0 and !std.mem.eql(u8, worker.protocol, workflow.dispatch.protocol)) {
863+
continue;
864+
}
865+
const current_tasks = store.countRunningStepsByWorker(worker.id) catch 0;
866+
worker_infos.append(tick_alloc, .{
867+
.id = worker.id,
868+
.url = worker.url,
869+
.token = worker.token,
870+
.protocol = worker.protocol,
871+
.model = worker.model,
872+
.tags_json = worker.tags_json,
873+
.max_concurrent = worker.max_concurrent,
874+
.status = worker.status,
875+
.current_tasks = current_tasks,
876+
}) catch {
877+
task.state = .failed;
878+
return;
879+
};
880+
}
881+
882+
const selected_worker = dispatch_mod.selectWorker(tick_alloc, worker_infos.items, workflow.dispatch.worker_tags) catch |err| {
883+
log.err("worker selection failed for dispatch task {s}: {}", .{ task.task_id, err });
884+
task.state = .failed;
885+
return;
886+
};
887+
const worker = selected_worker orelse {
888+
log.warn("no worker available for dispatch task {s} (pipeline={s}, protocol={s})", .{
889+
task.task_id,
890+
task.pipeline_id,
891+
workflow.dispatch.protocol,
892+
});
893+
task.state = .failed;
894+
return;
895+
};
896+
897+
const step_id = if (workflow.id.len > 0) workflow.id else workflow.pipeline_id;
898+
const result = dispatch_mod.dispatchStep(
899+
tick_alloc,
900+
worker.url,
901+
worker.token,
902+
worker.protocol,
903+
worker.model,
904+
task.run_id,
905+
step_id,
906+
rendered,
907+
) catch |err| {
908+
log.err("dispatch failed for task {s}: {}", .{ task.task_id, err });
909+
task.state = .failed;
910+
return;
911+
};
912+
913+
if (!result.success or result.async_pending) {
914+
if (result.async_pending) {
915+
log.warn("dispatch task {s} returned async_pending, which tracker mode does not support yet", .{task.task_id});
916+
} else {
917+
log.warn("dispatch task {s} failed: {s}", .{ task.task_id, result.error_text orelse "dispatch failed" });
918+
}
919+
task.state = .failed;
920+
return;
921+
}
922+
923+
task.current_turn += 1;
924+
task.last_activity_ms = ids.nowMs();
925+
926+
var client = tracker_client.TrackerClient.init(tick_alloc, self.cfg.url orelse "", self.cfg.api_token);
927+
const event_data = std.fmt.allocPrint(
928+
tick_alloc,
929+
"{{\"worker_id\":\"{s}\",\"output_bytes\":{d}}}",
930+
.{ worker.id, result.output.len },
931+
) catch "{}";
932+
_ = client.postEvent(task.run_id, "dispatch_completed", event_data, task.lease_token) catch {};
933+
934+
task.state = .completing;
935+
self.driveCompleting(tick_alloc, task);
859936
}
860937

861938
/// completing: run after_run hook, call transition, kill subprocess
@@ -915,9 +992,7 @@ pub const Tracker = struct {
915992
};
916993
ws.remove();
917994

918-
self.state.mutex.lock();
919995
self.state.completed_count += 1;
920-
self.state.mutex.unlock();
921996
task.state = .removing;
922997
}
923998

@@ -945,14 +1020,14 @@ pub const Tracker = struct {
9451020
};
9461021
ws.remove();
9471022

948-
self.state.mutex.lock();
9491023
self.state.failed_count += 1;
950-
self.state.mutex.unlock();
9511024
task.state = .removing;
9521025
}
9531026

9541027
/// Kill all running subprocesses (called during shutdown).
9551028
fn shutdownSubprocesses(self: *Tracker) void {
1029+
self.state.mutex.lock();
1030+
defer self.state.mutex.unlock();
9561031
for (self.state.running.values()) |*task| {
9571032
if (task.subprocess) |*sub| {
9581033
if (sub.child) |*child| {
@@ -1195,7 +1270,7 @@ test "Tracker allocatePort returns unique ports" {
11951270
var shutdown = std.atomic.Value(bool).init(false);
11961271
const workflows = workflow_loader.WorkflowMap{};
11971272

1198-
var tracker_inst = Tracker.init(allocator, config.TrackerConfig{}, workflows, &shutdown);
1273+
var tracker_inst = Tracker.init(allocator, config.TrackerConfig{}, workflows, null, &shutdown);
11991274
defer tracker_inst.deinit();
12001275

12011276
const port1 = tracker_inst.allocatePort();

0 commit comments

Comments
 (0)