From 5ace939689418e79d9befa8b76e14546abde543f Mon Sep 17 00:00:00 2001 From: Patrick Marsceill Date: Wed, 21 Jan 2026 15:05:07 -0500 Subject: [PATCH] feat(daemon): keep tasks pending until an agent is available Tasks now remain in the queue until an available agent can handle them. When an agent becomes available (spawned or finishes a task), pending tasks are automatically assigned in FIFO order. Closes #10 Co-Authored-By: Claude Opus 4.5 --- internal/daemon/process.go | 37 +++++++++++++++++++++++++++++++------ internal/daemon/server.go | 3 +++ internal/daemon/task.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/internal/daemon/process.go b/internal/daemon/process.go index 68f3580..2d88a72 100644 --- a/internal/daemon/process.go +++ b/internal/daemon/process.go @@ -17,11 +17,12 @@ import ( // ProcessManager manages spawned Claude Code agent slots using tmux sessions type ProcessManager struct { - mu sync.RWMutex - agents map[string]*AgentSlot - eventCh chan *mapv1.Event - logsDir string - lastAssigned string // ID of last agent assigned a task (for round-robin) + mu sync.RWMutex + agents map[string]*AgentSlot + eventCh chan *mapv1.Event + logsDir string + lastAssigned string // ID of last agent assigned a task (for round-robin) + onAgentAvailable func() // callback when an agent becomes available } // AgentSlot represents an agent running in a tmux session @@ -61,6 +62,14 @@ func NewProcessManager(logsDir string, eventCh chan *mapv1.Event) *ProcessManage } } +// SetOnAgentAvailable sets a callback that is invoked when an agent becomes available. +// This is used to trigger processing of pending tasks. +func (m *ProcessManager) SetOnAgentAvailable(callback func()) { + m.mu.Lock() + defer m.mu.Unlock() + m.onAgentAvailable = callback +} + // CreateSlot creates a new agent with a tmux session running claude or codex // agentType should be "claude" (default) or "codex" // If skipPermissions is true, the agent is started with permission-bypassing flags @@ -148,11 +157,19 @@ func (m *ProcessManager) CreateSlot(agentID, workdir, agentType string, skipPerm m.agents[agentID] = slot + // Capture callback before unlocking + callback := m.onAgentAvailable + // Emit connected event m.emitAgentEvent(slot, true) log.Printf("created %s agent %s with tmux session %s (workdir: %s)", cliBinary, agentID, tmuxSession, workdir) + // Notify that an agent is available (for pending task processing) + if callback != nil { + go callback() + } + return slot, nil } @@ -177,12 +194,20 @@ func (m *ProcessManager) ExecuteTask(ctx context.Context, agentID string, taskID tmuxSession := slot.TmuxSession slot.mu.Unlock() - // Ensure we release the slot when done + // Ensure we release the slot when done and notify about availability defer func() { slot.mu.Lock() slot.Status = AgentStatusIdle slot.CurrentTask = "" slot.mu.Unlock() + + // Notify that an agent is available (for pending task processing) + m.mu.RLock() + callback := m.onAgentAvailable + m.mu.RUnlock() + if callback != nil { + go callback() + } }() log.Printf("agent %s executing task %s via tmux", agentID, taskID) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 8ecb798..64a5866 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -74,6 +74,9 @@ func NewServer(cfg *Config) (*Server, error) { tasks := NewTaskRouter(store, processes, eventCh) names := NewNameGenerator() + // Wire up callback to process pending tasks when agents become available + processes.SetOnAgentAvailable(tasks.ProcessPendingTasks) + s := &Server{ store: store, tasks: tasks, diff --git a/internal/daemon/task.go b/internal/daemon/task.go index 8cc25b1..dd7b903 100644 --- a/internal/daemon/task.go +++ b/internal/daemon/task.go @@ -80,6 +80,38 @@ func (r *TaskRouter) routeTask(task *mapv1.Task) { // No agents available, task remains pending } +// ProcessPendingTasks assigns pending tasks to available agents. +// Called when an agent becomes available (spawned or finished a task). +func (r *TaskRouter) ProcessPendingTasks() { + r.mu.Lock() + defer r.mu.Unlock() + + // Get pending tasks ordered by creation time (oldest first) + pendingTasks, err := r.store.ListTasks("pending", "", 0) + if err != nil { + return + } + + // Reverse to process oldest first (ListTasks returns DESC order) + for i := len(pendingTasks) - 1; i >= 0; i-- { + task := pendingTasks[i] + + // Find an available agent + if r.spawned == nil { + return + } + slot := r.spawned.FindAvailableAgent() + if slot == nil { + // No more available agents + return + } + + // Convert to proto and assign + protoTask := taskRecordToProto(task) + r.executeOnSpawnedAgent(protoTask, slot) + } +} + // executeOnSpawnedAgent runs a task on a spawned Claude agent slot func (r *TaskRouter) executeOnSpawnedAgent(task *mapv1.Task, slot *AgentSlot) { // Update task status to in_progress