-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_task.py
More file actions
executable file
·73 lines (58 loc) · 2.01 KB
/
send_task.py
File metadata and controls
executable file
·73 lines (58 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python3
"""
Send a task to the Claude wrapper.
Usage:
./send_task.py "Fix the bug in main.py"
./send_task.py --workdir /path/to/project "Add tests"
echo "Your task" | ./send_task.py --stdin
"""
import json
import sys
import argparse
import uuid
from pathlib import Path
from datetime import datetime
import os
PROTOCOL_DIR = Path(os.environ.get("AGENT_PROTOCOL_DIR", "."))
COMMANDS_FILE = PROTOCOL_DIR / "commands.jsonl"
def send_task(task: str, workdir: str = None) -> str:
"""Send a task and return the command ID."""
cmd_id = f"cmd_{uuid.uuid4().hex[:8]}"
command = {
"id": cmd_id,
"type": "task",
"task": task,
"timestamp": datetime.now().isoformat(),
}
if workdir:
command["workdir"] = workdir
COMMANDS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(COMMANDS_FILE, "a") as f:
f.write(json.dumps(command) + "\n")
return cmd_id
def main():
parser = argparse.ArgumentParser(description="Send task to Claude wrapper")
parser.add_argument("task", nargs="?", help="Task to send")
parser.add_argument("--workdir", "-w", help="Working directory for the task")
parser.add_argument("--stdin", action="store_true", help="Read task from stdin")
parser.add_argument("--abort", action="store_true", help="Send abort command")
args = parser.parse_args()
if args.abort:
cmd_id = f"cmd_{uuid.uuid4().hex[:8]}"
with open(COMMANDS_FILE, "a") as f:
f.write(json.dumps({"id": cmd_id, "type": "abort"}) + "\n")
print(f"Sent abort command: {cmd_id}")
return
if args.stdin:
task = sys.stdin.read().strip()
elif args.task:
task = args.task
else:
parser.print_help()
sys.exit(1)
cmd_id = send_task(task, args.workdir)
print(f"Task sent: {cmd_id}")
print(f"Task: {task[:100]}{'...' if len(task) > 100 else ''}")
print(f"\nWatch output: tail -f {PROTOCOL_DIR / 'output.jsonl'}")
if __name__ == "__main__":
main()