Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
id: FEATURES-20260305-171348-HVS
status: pending
status: completed
title: Implement Phase 3 Subagent Orchestration Orchestrator
priority: medium
created: 2026-03-05 17:13:48
Expand Down
126 changes: 126 additions & 0 deletions scripts/orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import os
import sys
import json
import time
import argparse

# Setup paths
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
REPO_ROOT = os.getenv("TASKS_REPO_ROOT", os.path.dirname(SCRIPT_DIR))
sys.path.append(REPO_ROOT)

from scripts import tasks
from scripts import comm

class Orchestrator:
def __init__(self, agent_id="orchestrator"):
self.agent_id = agent_id
self.comm_bus = comm.Comm(agent_id=self.agent_id, role="orchestrator")
self.comm_bus.register()

def get_pending_tasks(self):
"""Retrieve tasks with 'pending' status."""
# Using tasks module directly. list_tasks can return list of dicts.
pending = tasks.list_tasks(status="pending", output_format="json")
if isinstance(pending, str):
try:
pending = json.loads(pending)
except json.JSONDecodeError:
pending = []
return pending

def assign_tasks(self):
"""Assign pending tasks to available agents."""
pending_tasks = self.get_pending_tasks()
if not pending_tasks:
return 0

agents = self.comm_bus.list_agents()
available_agents = [a for a in agents if a.get("id") != self.agent_id]
if not available_agents:
return 0

assigned_count = 0
for task in pending_tasks:
# Simple round-robin or greedy assignment
agent = available_agents[assigned_count % len(available_agents)]
agent_id = agent.get("id")

# Send message to agent
msg_content = json.dumps({"task_id": task["id"], "action": "execute"})
self.comm_bus.send(recipient_id=agent_id, content=msg_content, type="task_assignment", request_receipt=True)

# Update task status to in_progress
tasks.update_task_status(task["id"], "in_progress", output_format="json")
assigned_count += 1

return assigned_count

def monitor(self):
"""Monitor messages for task completions and update statuses."""
messages = self.comm_bus.read()
processed_count = 0
for msg in messages:
msg_type = msg.get("type", "")
content = msg.get("content", "")

if msg_type == "task_completion":
try:
data = json.loads(content)
task_id = data.get("task_id")
if task_id:
tasks.update_task_status(task_id, "completed", output_format="json")
processed_count += 1
except json.JSONDecodeError:
pass
return processed_count

def run(self, interval=5):
"""Unified loop to repeatedly assign tasks and monitor."""
print(f"Orchestrator {self.agent_id} starting run loop. Press Ctrl+C to stop.")
try:
while True:
assigned = self.assign_tasks()
if assigned > 0:
print(f"Assigned {assigned} tasks.")

processed = self.monitor()
if processed > 0:
print(f"Processed {processed} completion messages.")

time.sleep(interval)
except KeyboardInterrupt:
print("Orchestrator stopped.")

def main():
parser = argparse.ArgumentParser(description="Subagent Orchestrator")
subparsers = parser.add_subparsers(dest="command")

# Assign
subparsers.add_parser("assign", help="Assign pending tasks to available agents once")

# Monitor
subparsers.add_parser("monitor", help="Check for completion messages once")

# Run
run_parser = subparsers.add_parser("run", help="Run the orchestrator loop")
run_parser.add_argument("--interval", type=int, default=5, help="Polling interval in seconds")

args = parser.parse_args()

orchestrator = Orchestrator()

if args.command == "assign":
count = orchestrator.assign_tasks()
print(f"Assigned {count} tasks.")
elif args.command == "monitor":
count = orchestrator.monitor()
print(f"Processed {count} messages.")
elif args.command == "run":
orchestrator.run(interval=args.interval)
else:
parser.print_help()

if __name__ == "__main__":
main()
72 changes: 72 additions & 0 deletions tests/test_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import os
import sys

# Setup paths
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
REPO_ROOT = os.path.dirname(SCRIPT_DIR)
sys.path.append(REPO_ROOT)

from scripts import orchestrator
from scripts import tasks
from scripts import comm

class TestOrchestrator(unittest.TestCase):
def setUp(self):
with patch('scripts.comm.Comm.register'):
self.orch = orchestrator.Orchestrator()

@patch('scripts.tasks.list_tasks')
def test_get_pending_tasks(self, mock_list_tasks):
mock_tasks = [{"id": "TASK-1", "status": "pending"}]
mock_list_tasks.return_value = mock_tasks

pending = self.orch.get_pending_tasks()

mock_list_tasks.assert_called_once_with(status="pending", output_format="json")
self.assertEqual(pending, mock_tasks)

@patch('scripts.tasks.list_tasks')
def test_get_pending_tasks_string_response(self, mock_list_tasks):
mock_tasks = [{"id": "TASK-1", "status": "pending"}]
mock_list_tasks.return_value = json.dumps(mock_tasks)

pending = self.orch.get_pending_tasks()

mock_list_tasks.assert_called_once_with(status="pending", output_format="json")
self.assertEqual(pending, mock_tasks)

@patch.object(orchestrator.Orchestrator, 'get_pending_tasks')
@patch('scripts.comm.Comm.list_agents')
@patch('scripts.comm.Comm.send')
@patch('scripts.tasks.update_task_status')
def test_assign_tasks(self, mock_update, mock_send, mock_list_agents, mock_get_pending):
mock_get_pending.return_value = [{"id": "TASK-1", "status": "pending"}]
mock_list_agents.return_value = [{"id": "agent-1", "role": "worker"}]

count = self.orch.assign_tasks()

self.assertEqual(count, 1)
mock_send.assert_called_once()
args, kwargs = mock_send.call_args
self.assertEqual(kwargs['recipient_id'], "agent-1")
self.assertEqual(kwargs['type'], "task_assignment")
mock_update.assert_called_once_with("TASK-1", "in_progress", output_format="json")

@patch('scripts.comm.Comm.read')
@patch('scripts.tasks.update_task_status')
def test_monitor(self, mock_update, mock_read):
mock_read.return_value = [
{"type": "task_completion", "content": json.dumps({"task_id": "TASK-1"})},
{"type": "other", "content": "hello"}
]

count = self.orch.monitor()

self.assertEqual(count, 1)
mock_update.assert_called_once_with("TASK-1", "completed", output_format="json")

if __name__ == '__main__':
unittest.main()