Skip to content

feat: Task Queue system with state machine, retries, and orphan recovery #3970

@OneStepAt4time

Description

@OneStepAt4time

Task Queue System — Implementation Specification

Priority: P1 (foundational — all other features depend on this)
Reference: multica/server/internal/service/task.go, multica/server/internal/handler/task_lifecycle.go, multica/server/pkg/protocol/events.go


Overview

Implement a proper task queue that replaces Aegis's current ad-hoc session management. Multica's task queue is the backbone of their platform — every agent action (assignment, @mention, chat, autopilot) produces a task.

State Machine

queued → dispatched → running → completed
                     → running → failed → queued (retry, max 2 attempts)
any    → cancelled

States

State Meaning Entry trigger
queued Waiting for daemon to pick up Task created, or retry
dispatched Daemon claimed, starting tool Daemon claims via HTTP
running AI coding tool is executing Tool started successfully
completed Finished successfully Tool reported success
failed Aborted (error/timeout) Tool error, timeout, or daemon crash
cancelled User cancelled Manual cancel at any point

Database Schema

CREATE TABLE agent_task_queue (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workspace_id UUID NOT NULL REFERENCES workspace(id),
    agent_id UUID NOT NULL REFERENCES agent(id),
    issue_id UUID REFERENCES issue(id),  -- nullable for chat/autopilot tasks
    
    status TEXT NOT NULL DEFAULT 'queued' 
        CHECK (status IN ('queued', 'dispatched', 'running', 'completed', 'failed', 'cancelled')),
    
    -- Trigger context
    trigger_type TEXT,  -- 'assignment', 'mention', 'chat', 'autopilot', 'manual_rerun'
    trigger_comment_id UUID REFERENCES comment(id),
    trigger_summary TEXT,  -- truncated trigger content for list views
    
    -- Execution state
    session_id TEXT,  -- CC session ID for resumption
    work_dir TEXT,    -- agent working directory
    output TEXT,      -- final output from agent
    
    -- Retry tracking
    attempt INTEGER NOT NULL DEFAULT 1,
    max_attempts INTEGER NOT NULL DEFAULT 2,
    failure_reason TEXT,  -- 'runtime_offline', 'timeout', 'agent_error', 'cancelled'
    parent_task_id UUID REFERENCES agent_task_queue(id),  -- for retries
    
    -- Timing
    claimed_at TIMESTAMPTZ,
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_task_queue_status ON agent_task_queue(status) 
    WHERE status IN ('queued', 'dispatched', 'running');
CREATE INDEX idx_task_queue_agent ON agent_task_queue(agent_id, status);
CREATE INDEX idx_task_queue_issue ON agent_task_queue(issue_id);

API Endpoints

Daemon-facing (internal)

POST   /api/tasks/claim                    — daemon claims next queued task for runtime
POST   /api/tasks/:id/start                 — daemon reports tool started
POST   /api/tasks/:id/complete              — daemon reports success
POST   /api/tasks/:id/fail                  — daemon reports failure
POST   /api/tasks/:id/progress              — daemon reports progress (WS alternative)
POST   /api/tasks/:id/session               — daemon pins session_id + work_dir
POST   /api/tasks/:id/cancel                — cancel a task
POST   /api/runtime/:runtimeId/orphans      — daemon reports orphan recovery on startup

User-facing

GET    /api/tasks                           — list tasks (filterable by agent, issue, status)
GET    /api/tasks/:id                       — get task details
POST   /api/issues/:id/rerun                — manually re-enqueue a task (optionally targeting a past task_id)

Timeout Rules

Situation Timeout Failure reason Retryable
Dispatched but never started 5 minutes timeout Yes
Running too long 2.5 hours timeout Yes
Agent tool error N/A agent_error No
Runtime offline Sweeper (75s) runtime_offline Yes

Retry Rules

  • Max 2 attempts (1 original + 1 retry)
  • Only retryable reasons: runtime_offline, timeout
  • Non-retryable: agent_error (API errors, quota exceeded)
  • Autopilot-triggered tasks do NOT auto-retry
  • Manual rerun always starts fresh session (force_fresh_session=true)
  • Auto-retry inherits session for context continuity

Orphan Recovery

When daemon restarts:

  1. Daemon calls POST /api/runtime/:id/orphans
  2. Server atomically fails all dispatched/running tasks for that runtime
  3. Failed tasks go through normal retry pipeline
  4. This is faster than waiting for the runtime sweeper (75s + 2.5h)

Session Pinning

  • Daemon calls POST /api/tasks/:id/session with {session_id, work_dir} immediately after agent emits first message
  • Prevents losing resume pointer if daemon crashes mid-run
  • Stored on task row, not separate table

WebSocket Events

// Server → Web Client
"task:queued"    // new task created
"task:dispatch"  // task claimed by daemon
"task:progress"  // agent progress update
"task:completed" // task finished successfully
"task:failed"    // task failed
"task:cancelled" // task cancelled
"task:message"   // agent execution message (tool call, text, error)

Acceptance Criteria

  1. Task state machine transitions work correctly for all state pairs
  2. Daemon can claim, start, complete, and fail tasks via HTTP API
  3. Automatic retry fires for retryable failures, max 2 attempts
  4. Manual rerun creates new task with fresh session
  5. Orphan recovery works on daemon restart
  6. Session pinning preserves resume state
  7. WebSocket events fire for all state transitions
  8. Timeout sweeper moves stale dispatched tasks to failed
  9. npm run gate passes
  10. New tests cover: state transitions, retry logic, orphan recovery, timeout handling

E2E Test Requirements

  1. Create task → verify queued state
  2. Daemon claims → verify dispatched
  3. Daemon starts → verify running
  4. Daemon completes → verify completed, check events
  5. Daemon fails with retryable reason → verify retry task created
  6. Daemon fails with non-retryable reason → verify no retry
  7. Max attempts exhausted → verify stays failed
  8. Manual rerun → verify new task with fresh session
  9. Orphan recovery → verify stale tasks recovered
  10. Timeout → verify task moved to failed after timeout
  11. Cancel mid-run → verify cancelled state

Multica Source Reference

  • server/internal/service/task.go — TaskService with full lifecycle
  • server/internal/handler/task_lifecycle.go — HTTP handlers
  • server/pkg/protocol/events.go — WS event types
  • server/pkg/protocol/messages.go — WS message payloads
  • server/pkg/db/queries/issue.sql — task-related queries

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions