Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/CodingAgent/backends/claude_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ def build_command(
"--verbose",
"--permission-mode", "bypassPermissions",
]
if session_id:
cmd += ["--session-id", session_id]
# Note: --session-id is intentionally omitted. Claude Code's -p (print)
# mode does not release the session lock on exit, causing "Session ID
# already in use" errors on follow-up requests. Each request runs as a
# fresh session instead. Multi-turn context is handled by Codex backend.
# cwd is handled by subprocess cwd= parameter, not a CLI flag
return cmd

Expand Down
177 changes: 167 additions & 10 deletions examples/CodingAgent/coding_genesis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@

import argparse
import asyncio
import json
import logging
import os
import subprocess
import sys
import uuid

# Ensure genesis_lib is importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))

from genesis_lib.monitored_agent import MonitoredAgent
from genesis_lib.graph_monitoring import COMPONENT_TYPE, STATE, EDGE_TYPE
from genesis_lib.stream_publisher import StreamPublisher

# Local backend imports
sys.path.insert(0, os.path.dirname(__file__))
Expand Down Expand Up @@ -75,6 +78,9 @@ def __init__(
**kwargs,
)

# --- Stream publisher for real-time event delivery ---
self._stream_pub = StreamPublisher(self.app.participant)

# --- Ensure working directory is a git repo (Codex requires it) ---
if self._working_dir and backend == "codex":
git_dir = os.path.join(self._working_dir, ".git")
Expand Down Expand Up @@ -111,6 +117,57 @@ def __init__(
},
)

# ------------------------------------------------------------------
# Conversation history (Genesis memory architecture)
# ------------------------------------------------------------------

def _build_prompt_with_history(self, message, conv_id):
"""Build a prompt that includes conversation history from Genesis memory.

For backends without native session support (Claude), the conversation
history is prepended to the prompt so the model has context from prior
turns. For backends with native sessions (Codex with active session_id),
history is managed by the session and the raw message is returned.

Uses self.memory (SimpleMemoryAdapter by default) which is inherited from
GenesisAgent. When context grows too large, Genesis memory plugins
(compaction, summary, summary-plus-last, etc.) can be swapped in via
the MemoryAdapter interface.
"""
# Codex with an active session already has context — return raw message
session_id = self._sessions.get(conv_id) if conv_id else None
if session_id and self._backend.name == "codex":
return message

# Retrieve conversation history from Genesis memory
memory_items = self.memory.retrieve(k=50)
if not memory_items:
return message

# Build conversation transcript
history_lines = []
for entry in memory_items:
item = entry.get("item", "")
meta = entry.get("metadata") or {}
role = meta.get("role", "user")
if role == "user":
history_lines.append(f"[User]: {item}")
elif role == "assistant":
# Truncate long assistant responses to keep prompt manageable
text = item if len(item) <= 2000 else item[:2000] + "... (truncated)"
history_lines.append(f"[Assistant]: {text}")

history_block = "\n\n".join(history_lines)

return (
f"## Conversation History\n"
f"Below is our conversation so far. Use this context to understand "
f"follow-up requests.\n\n"
f"{history_block}\n\n"
f"## Current Request\n"
f"{message}"
)

# ------------------------------------------------------------------
# LLM abstract method stubs (not used — we delegate to subprocess)
# ------------------------------------------------------------------
Expand Down Expand Up @@ -232,37 +289,126 @@ async def process_request(self, request):
return result

async def _do_coding_request(self, request):
"""Execute a coding task via subprocess."""
"""Execute a coding task via subprocess with real-time stream publishing."""
message = request.get("message", "")
conv_id = request.get("conversation_id")
# Use request_id from caller (web interface) or generate one
request_id = request.get("request_id") or str(uuid.uuid4())
session_id = self._sessions.get(conv_id) if conv_id else None

cmd = self._backend.build_command(message, session_id, self._working_dir)
# Build prompt with conversation history from Genesis memory
prompt = self._build_prompt_with_history(message, conv_id)

cmd = self._backend.build_command(prompt, session_id, self._working_dir)
env = self._backend.build_env()

logger.info("Spawning: %s", " ".join(cmd))

# Pipe stdin so we can send the prompt for session resume commands
# (Codex 'exec resume' reads the follow-up prompt from stdin)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self._working_dir,
)
self._active_proc = proc

# Codex 'exec resume' reads the follow-up prompt from stdin;
# Claude always uses -p on the command line, so stdin is not needed.
if session_id and message and self._backend.name == "codex":
proc.stdin.write(message.encode("utf-8"))
proc.stdin.write(b"\n")
await proc.stdin.drain()
proc.stdin.close()

# Read events inline so we can publish each one to the DDS stream
events = []
result_text = ""
new_session_id = None
timed_out = False

async def _stream():
nonlocal result_text, new_session_id
async for raw_line in proc.stdout:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line:
continue
logger.debug("stdout: %s", line[:500])
try:
event = self._backend.parse_line(line)
except Exception:
continue
if event is None:
continue

events.append(event)

# Accumulate text (same logic as read_events)
if event.kind == "init" and event.session_id:
new_session_id = event.session_id
elif event.kind == "text":
result_text += event.text
elif event.kind == "done" and event.text:
result_text = event.text
elif event.kind == "error":
result_text = f"Error: {event.text}"

# Publish to DDS stream in real-time
try:
content = ""
metadata = {}
if event.kind == "init":
content = event.session_id or ""
elif event.kind == "text":
content = event.text
elif event.kind == "tool_start":
content = event.tool_name
metadata = {"tool_input": event.tool_input}
elif event.kind == "tool_result":
content = event.tool_output
metadata = {"tool_name": event.tool_name}
elif event.kind == "done":
content = event.text or ""
elif event.kind == "error":
content = event.text
self._stream_pub.publish(
request_id=request_id,
chunk_type=event.kind,
content=content,
metadata=metadata,
)
except Exception as exc:
logger.debug("Stream publish failed: %s", exc)

try:
events, result_text, new_session_id, timed_out = await read_events(
proc, self._backend, self._timeout,
)
try:
await asyncio.wait_for(_stream(), timeout=self._timeout)
except asyncio.TimeoutError:
timed_out = True
result_text = f"Request timed out after {self._timeout}s"
try:
self._stream_pub.publish(
request_id=request_id,
chunk_type="error",
content=result_text,
)
except Exception:
pass
finally:
if proc.returncode is None:
proc.terminate()
# Let the process exit naturally (release session locks, etc.)
try:
await asyncio.wait_for(proc.wait(), timeout=5.0)
await asyncio.wait_for(proc.wait(), timeout=10.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
self._active_proc = None

# Log subprocess result for debugging
Expand Down Expand Up @@ -315,11 +461,21 @@ async def _do_coding_request(self, request):
timed_out,
)

# DDS InterfaceAgentReply: message, status, conversation_id
# Store conversation turns in Genesis memory for multi-turn context.
# Uses self.memory (SimpleMemoryAdapter) inherited from GenesisAgent.
# Future memory plugins (compaction, summary) swap in via MemoryAdapter.
try:
self.memory.store(message, metadata={"role": "user", "conversation_id": conv_id or ""})
if result_text and not timed_out:
self.memory.store(result_text, metadata={"role": "assistant", "conversation_id": conv_id or ""})
except Exception as exc:
logger.debug("Memory store failed: %s", exc)

return {
"message": result_text,
"status": status,
"conversation_id": conv_id or "",
"request_id": request_id,
}

# ------------------------------------------------------------------
Expand All @@ -343,6 +499,7 @@ async def close(self):
except asyncio.TimeoutError:
self._active_proc.kill()
await self._active_proc.wait()
self._stream_pub.close()
await super().close()


Expand Down
126 changes: 126 additions & 0 deletions examples/CodingAgent/run_web.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/bin/bash
# ============================================================================
# Launch CodingAgent + Web Interface
#
# Usage:
# ./run_web.sh [--backend claude|codex] [--port 5080] [--workspace DIR]
#
# This script:
# 1. Sources the virtual environment and .env
# 2. Starts the CodingGenesisAgent in the background
# 3. Waits for DDS discovery
# 4. Starts the web server in the foreground
# 5. Cleans up on exit (SIGINT/SIGTERM)
# ============================================================================

set -e

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
ROOT_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"

# Defaults
BACKEND="codex"
PORT=5080
WORKSPACE="$SCRIPT_DIR/workspace"
AGENT_NAME=""

# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--backend)
BACKEND="$2"
shift 2
;;
--port)
PORT="$2"
shift 2
;;
--workspace)
WORKSPACE="$2"
shift 2
;;
--agent-name)
AGENT_NAME="$2"
shift 2
;;
-h|--help)
echo "Usage: $0 [--backend claude|codex] [--port 5080] [--workspace DIR] [--agent-name NAME]"
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done

# Set default agent name based on backend
if [ -z "$AGENT_NAME" ]; then
AGENT_NAME="CodingAgent-${BACKEND}"
fi

# Source environment
if [ -f "$ROOT_DIR/venv/bin/activate" ]; then
source "$ROOT_DIR/venv/bin/activate"
fi
if [ -f "$ROOT_DIR/.env" ]; then
source "$ROOT_DIR/.env"
fi

# Ensure workspace exists
mkdir -p "$WORKSPACE"

# Trap to clean up background processes
AGENT_PID=""
cleanup() {
echo ""
echo "[run_web] Shutting down..."
if [ -n "$AGENT_PID" ] && kill -0 "$AGENT_PID" 2>/dev/null; then
echo "[run_web] Stopping agent (PID $AGENT_PID)..."
kill "$AGENT_PID" 2>/dev/null
wait "$AGENT_PID" 2>/dev/null || true
fi
echo "[run_web] Done."
exit 0
}
trap cleanup SIGINT SIGTERM EXIT

echo "============================================"
echo " Genesis CodingAgent — Web Interface"
echo "============================================"
echo " Backend: $BACKEND"
echo " Port: $PORT"
echo " Workspace: $WORKSPACE"
echo " Agent: $AGENT_NAME"
echo "============================================"
echo ""

# Start CodingGenesisAgent in background
echo "[run_web] Starting CodingGenesisAgent ($BACKEND)..."
python "$SCRIPT_DIR/coding_genesis_agent.py" \
--backend "$BACKEND" \
--working-dir "$WORKSPACE" \
--agent-name "$AGENT_NAME" &
AGENT_PID=$!

# Wait for agent to initialize and DDS discovery
echo "[run_web] Waiting for agent to initialize..."
sleep 4

# Verify agent is running
if ! kill -0 "$AGENT_PID" 2>/dev/null; then
echo "[run_web] ERROR: Agent process exited unexpectedly."
exit 1
fi

echo "[run_web] Agent running (PID $AGENT_PID)."
echo ""

# Start web server in foreground
echo "[run_web] Starting web server on http://localhost:$PORT"
echo "[run_web] Press Ctrl+C to stop."
echo ""

python "$SCRIPT_DIR/web_server.py" \
--port "$PORT" \
--workspace "$WORKSPACE"
Loading