Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions internal/daemon/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (

// ProcessManager manages spawned Claude Code agent slots using tmux sessions
type ProcessManager struct {
mu sync.RWMutex
agents map[string]*AgentSlot
eventCh chan *mapv1.Event
logsDir string
lastAssigned string // ID of last agent assigned a task (for round-robin)
mu sync.RWMutex
agents map[string]*AgentSlot
eventCh chan *mapv1.Event
logsDir string
lastAssigned string // ID of last agent assigned a task (for round-robin)
onAgentAvailable func() // callback when an agent becomes available
}

// AgentSlot represents an agent running in a tmux session
Expand Down Expand Up @@ -61,6 +62,14 @@ func NewProcessManager(logsDir string, eventCh chan *mapv1.Event) *ProcessManage
}
}

// SetOnAgentAvailable sets a callback that is invoked when an agent becomes available.
// This is used to trigger processing of pending tasks.
func (m *ProcessManager) SetOnAgentAvailable(callback func()) {
m.mu.Lock()
defer m.mu.Unlock()
m.onAgentAvailable = callback
}

// CreateSlot creates a new agent with a tmux session running claude or codex
// agentType should be "claude" (default) or "codex"
// If skipPermissions is true, the agent is started with permission-bypassing flags
Expand Down Expand Up @@ -148,11 +157,19 @@ func (m *ProcessManager) CreateSlot(agentID, workdir, agentType string, skipPerm

m.agents[agentID] = slot

// Capture callback before unlocking
callback := m.onAgentAvailable

// Emit connected event
m.emitAgentEvent(slot, true)

log.Printf("created %s agent %s with tmux session %s (workdir: %s)", cliBinary, agentID, tmuxSession, workdir)

// Notify that an agent is available (for pending task processing)
if callback != nil {
go callback()
}

return slot, nil
}

Expand All @@ -177,12 +194,20 @@ func (m *ProcessManager) ExecuteTask(ctx context.Context, agentID string, taskID
tmuxSession := slot.TmuxSession
slot.mu.Unlock()

// Ensure we release the slot when done
// Ensure we release the slot when done and notify about availability
defer func() {
slot.mu.Lock()
slot.Status = AgentStatusIdle
slot.CurrentTask = ""
slot.mu.Unlock()

// Notify that an agent is available (for pending task processing)
m.mu.RLock()
callback := m.onAgentAvailable
m.mu.RUnlock()
if callback != nil {
go callback()
}
}()

log.Printf("agent %s executing task %s via tmux", agentID, taskID)
Expand Down
3 changes: 3 additions & 0 deletions internal/daemon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func NewServer(cfg *Config) (*Server, error) {
tasks := NewTaskRouter(store, processes, eventCh)
names := NewNameGenerator()

// Wire up callback to process pending tasks when agents become available
processes.SetOnAgentAvailable(tasks.ProcessPendingTasks)

s := &Server{
store: store,
tasks: tasks,
Expand Down
32 changes: 32 additions & 0 deletions internal/daemon/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,38 @@ func (r *TaskRouter) routeTask(task *mapv1.Task) {
// No agents available, task remains pending
}

// ProcessPendingTasks assigns pending tasks to available agents.
// Called when an agent becomes available (spawned or finished a task).
func (r *TaskRouter) ProcessPendingTasks() {
r.mu.Lock()
defer r.mu.Unlock()

// Get pending tasks ordered by creation time (oldest first)
pendingTasks, err := r.store.ListTasks("pending", "", 0)
if err != nil {
return
}

// Reverse to process oldest first (ListTasks returns DESC order)
for i := len(pendingTasks) - 1; i >= 0; i-- {
task := pendingTasks[i]

// Find an available agent
if r.spawned == nil {
return
}
slot := r.spawned.FindAvailableAgent()
if slot == nil {
// No more available agents
return
}

// Convert to proto and assign
protoTask := taskRecordToProto(task)
r.executeOnSpawnedAgent(protoTask, slot)
}
}

// executeOnSpawnedAgent runs a task on a spawned Claude agent slot
func (r *TaskRouter) executeOnSpawnedAgent(task *mapv1.Task, slot *AgentSlot) {
// Update task status to in_progress
Expand Down