From 7fc3a0c5fbafa7b83f2ef388c63cddd12114e40e Mon Sep 17 00:00:00 2001 From: sploithunter Date: Thu, 19 Feb 2026 14:26:02 -0700 Subject: [PATCH] feat: add CodingAgent web interface with DDS streaming and multi-turn context Add a polished web interface for the CodingGenesisAgent with real-time streaming of coding events via DDS pub/sub, workspace file tree, 3D Genesis graph visualization, and multi-turn conversation history using the Genesis memory architecture. Key changes: - StreamChunk DDS type + StreamPublisher/StreamSubscriber helpers in genesis_lib - Flask/SocketIO web server with live event streaming to browser - IDE-style UI: file tree, chat with streaming events, graph panel - Multi-turn conversation context via Genesis MemoryAdapter (SimpleMemoryAdapter) - Claude backend: removed --session-id (lock issues), history built into prompt - Codex backend: native session resume + stdin prompt piping - 150 tests passing (112 backend + 38 mock agent including history tests) Co-Authored-By: Claude Opus 4.6 --- .../CodingAgent/backends/claude_backend.py | 6 +- examples/CodingAgent/coding_genesis_agent.py | 177 ++++++- examples/CodingAgent/run_web.sh | 126 +++++ examples/CodingAgent/static/app.js | 479 ++++++++++++++++++ examples/CodingAgent/static/styles.css | 435 ++++++++++++++++ examples/CodingAgent/templates/index.html | 96 ++++ examples/CodingAgent/tests/test_agent_mock.py | 75 +++ examples/CodingAgent/tests/test_backends.py | 8 +- examples/CodingAgent/web_server.py | 350 +++++++++++++ genesis_lib/__init__.py | 6 +- genesis_lib/config/datamodel.xml | 32 ++ genesis_lib/stream_publisher.py | 77 +++ genesis_lib/stream_subscriber.py | 108 ++++ 13 files changed, 1958 insertions(+), 17 deletions(-) create mode 100755 examples/CodingAgent/run_web.sh create mode 100644 examples/CodingAgent/static/app.js create mode 100644 examples/CodingAgent/static/styles.css create mode 100644 examples/CodingAgent/templates/index.html create mode 100644 examples/CodingAgent/web_server.py create mode 100644 genesis_lib/stream_publisher.py create mode 100644 genesis_lib/stream_subscriber.py diff --git a/examples/CodingAgent/backends/claude_backend.py b/examples/CodingAgent/backends/claude_backend.py index 4425947..f04421b 100644 --- a/examples/CodingAgent/backends/claude_backend.py +++ b/examples/CodingAgent/backends/claude_backend.py @@ -47,8 +47,10 @@ def build_command( "--verbose", "--permission-mode", "bypassPermissions", ] - if session_id: - cmd += ["--session-id", session_id] + # Note: --session-id is intentionally omitted. Claude Code's -p (print) + # mode does not release the session lock on exit, causing "Session ID + # already in use" errors on follow-up requests. Each request runs as a + # fresh session instead. Multi-turn context is handled by Codex backend. # cwd is handled by subprocess cwd= parameter, not a CLI flag return cmd diff --git a/examples/CodingAgent/coding_genesis_agent.py b/examples/CodingAgent/coding_genesis_agent.py index 5fbe04d..9a21a14 100644 --- a/examples/CodingAgent/coding_genesis_agent.py +++ b/examples/CodingAgent/coding_genesis_agent.py @@ -10,16 +10,19 @@ import argparse import asyncio +import json import logging import os import subprocess import sys +import uuid # Ensure genesis_lib is importable sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) from genesis_lib.monitored_agent import MonitoredAgent from genesis_lib.graph_monitoring import COMPONENT_TYPE, STATE, EDGE_TYPE +from genesis_lib.stream_publisher import StreamPublisher # Local backend imports sys.path.insert(0, os.path.dirname(__file__)) @@ -75,6 +78,9 @@ def __init__( **kwargs, ) + # --- Stream publisher for real-time event delivery --- + self._stream_pub = StreamPublisher(self.app.participant) + # --- Ensure working directory is a git repo (Codex requires it) --- if self._working_dir and backend == "codex": git_dir = os.path.join(self._working_dir, ".git") @@ -111,6 +117,57 @@ def __init__( }, ) + # ------------------------------------------------------------------ + # Conversation history (Genesis memory architecture) + # ------------------------------------------------------------------ + + def _build_prompt_with_history(self, message, conv_id): + """Build a prompt that includes conversation history from Genesis memory. + + For backends without native session support (Claude), the conversation + history is prepended to the prompt so the model has context from prior + turns. For backends with native sessions (Codex with active session_id), + history is managed by the session and the raw message is returned. + + Uses self.memory (SimpleMemoryAdapter by default) which is inherited from + GenesisAgent. When context grows too large, Genesis memory plugins + (compaction, summary, summary-plus-last, etc.) can be swapped in via + the MemoryAdapter interface. + """ + # Codex with an active session already has context — return raw message + session_id = self._sessions.get(conv_id) if conv_id else None + if session_id and self._backend.name == "codex": + return message + + # Retrieve conversation history from Genesis memory + memory_items = self.memory.retrieve(k=50) + if not memory_items: + return message + + # Build conversation transcript + history_lines = [] + for entry in memory_items: + item = entry.get("item", "") + meta = entry.get("metadata") or {} + role = meta.get("role", "user") + if role == "user": + history_lines.append(f"[User]: {item}") + elif role == "assistant": + # Truncate long assistant responses to keep prompt manageable + text = item if len(item) <= 2000 else item[:2000] + "... (truncated)" + history_lines.append(f"[Assistant]: {text}") + + history_block = "\n\n".join(history_lines) + + return ( + f"## Conversation History\n" + f"Below is our conversation so far. Use this context to understand " + f"follow-up requests.\n\n" + f"{history_block}\n\n" + f"## Current Request\n" + f"{message}" + ) + # ------------------------------------------------------------------ # LLM abstract method stubs (not used — we delegate to subprocess) # ------------------------------------------------------------------ @@ -232,18 +289,26 @@ async def process_request(self, request): return result async def _do_coding_request(self, request): - """Execute a coding task via subprocess.""" + """Execute a coding task via subprocess with real-time stream publishing.""" message = request.get("message", "") conv_id = request.get("conversation_id") + # Use request_id from caller (web interface) or generate one + request_id = request.get("request_id") or str(uuid.uuid4()) session_id = self._sessions.get(conv_id) if conv_id else None - cmd = self._backend.build_command(message, session_id, self._working_dir) + # Build prompt with conversation history from Genesis memory + prompt = self._build_prompt_with_history(message, conv_id) + + cmd = self._backend.build_command(prompt, session_id, self._working_dir) env = self._backend.build_env() logger.info("Spawning: %s", " ".join(cmd)) + # Pipe stdin so we can send the prompt for session resume commands + # (Codex 'exec resume' reads the follow-up prompt from stdin) proc = await asyncio.create_subprocess_exec( *cmd, + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, @@ -251,18 +316,99 @@ async def _do_coding_request(self, request): ) self._active_proc = proc + # Codex 'exec resume' reads the follow-up prompt from stdin; + # Claude always uses -p on the command line, so stdin is not needed. + if session_id and message and self._backend.name == "codex": + proc.stdin.write(message.encode("utf-8")) + proc.stdin.write(b"\n") + await proc.stdin.drain() + proc.stdin.close() + + # Read events inline so we can publish each one to the DDS stream + events = [] + result_text = "" + new_session_id = None + timed_out = False + + async def _stream(): + nonlocal result_text, new_session_id + async for raw_line in proc.stdout: + line = raw_line.decode("utf-8", errors="replace").strip() + if not line: + continue + logger.debug("stdout: %s", line[:500]) + try: + event = self._backend.parse_line(line) + except Exception: + continue + if event is None: + continue + + events.append(event) + + # Accumulate text (same logic as read_events) + if event.kind == "init" and event.session_id: + new_session_id = event.session_id + elif event.kind == "text": + result_text += event.text + elif event.kind == "done" and event.text: + result_text = event.text + elif event.kind == "error": + result_text = f"Error: {event.text}" + + # Publish to DDS stream in real-time + try: + content = "" + metadata = {} + if event.kind == "init": + content = event.session_id or "" + elif event.kind == "text": + content = event.text + elif event.kind == "tool_start": + content = event.tool_name + metadata = {"tool_input": event.tool_input} + elif event.kind == "tool_result": + content = event.tool_output + metadata = {"tool_name": event.tool_name} + elif event.kind == "done": + content = event.text or "" + elif event.kind == "error": + content = event.text + self._stream_pub.publish( + request_id=request_id, + chunk_type=event.kind, + content=content, + metadata=metadata, + ) + except Exception as exc: + logger.debug("Stream publish failed: %s", exc) + try: - events, result_text, new_session_id, timed_out = await read_events( - proc, self._backend, self._timeout, - ) + try: + await asyncio.wait_for(_stream(), timeout=self._timeout) + except asyncio.TimeoutError: + timed_out = True + result_text = f"Request timed out after {self._timeout}s" + try: + self._stream_pub.publish( + request_id=request_id, + chunk_type="error", + content=result_text, + ) + except Exception: + pass finally: if proc.returncode is None: - proc.terminate() + # Let the process exit naturally (release session locks, etc.) try: - await asyncio.wait_for(proc.wait(), timeout=5.0) + await asyncio.wait_for(proc.wait(), timeout=10.0) except asyncio.TimeoutError: - proc.kill() - await proc.wait() + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=5.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() self._active_proc = None # Log subprocess result for debugging @@ -315,11 +461,21 @@ async def _do_coding_request(self, request): timed_out, ) - # DDS InterfaceAgentReply: message, status, conversation_id + # Store conversation turns in Genesis memory for multi-turn context. + # Uses self.memory (SimpleMemoryAdapter) inherited from GenesisAgent. + # Future memory plugins (compaction, summary) swap in via MemoryAdapter. + try: + self.memory.store(message, metadata={"role": "user", "conversation_id": conv_id or ""}) + if result_text and not timed_out: + self.memory.store(result_text, metadata={"role": "assistant", "conversation_id": conv_id or ""}) + except Exception as exc: + logger.debug("Memory store failed: %s", exc) + return { "message": result_text, "status": status, "conversation_id": conv_id or "", + "request_id": request_id, } # ------------------------------------------------------------------ @@ -343,6 +499,7 @@ async def close(self): except asyncio.TimeoutError: self._active_proc.kill() await self._active_proc.wait() + self._stream_pub.close() await super().close() diff --git a/examples/CodingAgent/run_web.sh b/examples/CodingAgent/run_web.sh new file mode 100755 index 0000000..4999ebb --- /dev/null +++ b/examples/CodingAgent/run_web.sh @@ -0,0 +1,126 @@ +#!/bin/bash +# ============================================================================ +# Launch CodingAgent + Web Interface +# +# Usage: +# ./run_web.sh [--backend claude|codex] [--port 5080] [--workspace DIR] +# +# This script: +# 1. Sources the virtual environment and .env +# 2. Starts the CodingGenesisAgent in the background +# 3. Waits for DDS discovery +# 4. Starts the web server in the foreground +# 5. Cleans up on exit (SIGINT/SIGTERM) +# ============================================================================ + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" + +# Defaults +BACKEND="codex" +PORT=5080 +WORKSPACE="$SCRIPT_DIR/workspace" +AGENT_NAME="" + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + --backend) + BACKEND="$2" + shift 2 + ;; + --port) + PORT="$2" + shift 2 + ;; + --workspace) + WORKSPACE="$2" + shift 2 + ;; + --agent-name) + AGENT_NAME="$2" + shift 2 + ;; + -h|--help) + echo "Usage: $0 [--backend claude|codex] [--port 5080] [--workspace DIR] [--agent-name NAME]" + exit 0 + ;; + *) + echo "Unknown option: $1" + exit 1 + ;; + esac +done + +# Set default agent name based on backend +if [ -z "$AGENT_NAME" ]; then + AGENT_NAME="CodingAgent-${BACKEND}" +fi + +# Source environment +if [ -f "$ROOT_DIR/venv/bin/activate" ]; then + source "$ROOT_DIR/venv/bin/activate" +fi +if [ -f "$ROOT_DIR/.env" ]; then + source "$ROOT_DIR/.env" +fi + +# Ensure workspace exists +mkdir -p "$WORKSPACE" + +# Trap to clean up background processes +AGENT_PID="" +cleanup() { + echo "" + echo "[run_web] Shutting down..." + if [ -n "$AGENT_PID" ] && kill -0 "$AGENT_PID" 2>/dev/null; then + echo "[run_web] Stopping agent (PID $AGENT_PID)..." + kill "$AGENT_PID" 2>/dev/null + wait "$AGENT_PID" 2>/dev/null || true + fi + echo "[run_web] Done." + exit 0 +} +trap cleanup SIGINT SIGTERM EXIT + +echo "============================================" +echo " Genesis CodingAgent — Web Interface" +echo "============================================" +echo " Backend: $BACKEND" +echo " Port: $PORT" +echo " Workspace: $WORKSPACE" +echo " Agent: $AGENT_NAME" +echo "============================================" +echo "" + +# Start CodingGenesisAgent in background +echo "[run_web] Starting CodingGenesisAgent ($BACKEND)..." +python "$SCRIPT_DIR/coding_genesis_agent.py" \ + --backend "$BACKEND" \ + --working-dir "$WORKSPACE" \ + --agent-name "$AGENT_NAME" & +AGENT_PID=$! + +# Wait for agent to initialize and DDS discovery +echo "[run_web] Waiting for agent to initialize..." +sleep 4 + +# Verify agent is running +if ! kill -0 "$AGENT_PID" 2>/dev/null; then + echo "[run_web] ERROR: Agent process exited unexpectedly." + exit 1 +fi + +echo "[run_web] Agent running (PID $AGENT_PID)." +echo "" + +# Start web server in foreground +echo "[run_web] Starting web server on http://localhost:$PORT" +echo "[run_web] Press Ctrl+C to stop." +echo "" + +python "$SCRIPT_DIR/web_server.py" \ + --port "$PORT" \ + --workspace "$WORKSPACE" diff --git a/examples/CodingAgent/static/app.js b/examples/CodingAgent/static/app.js new file mode 100644 index 0000000..3137fcd --- /dev/null +++ b/examples/CodingAgent/static/app.js @@ -0,0 +1,479 @@ +/* ============================================ + Genesis CodingAgent — Client Application JS + ============================================ */ + +(function () { + "use strict"; + + // ---- Configure marked for markdown rendering ---- + marked.setOptions({ + highlight: function (code, lang) { + if (lang && hljs.getLanguage(lang)) { + return hljs.highlight(code, { language: lang }).value; + } + return hljs.highlightAuto(code).value; + }, + breaks: true, + gfm: true, + }); + + // ---- DOM refs ---- + const socket = io(location.protocol + "//" + location.host, { + transports: ["websocket", "polling"], + }); + + const agentSelect = document.getElementById("agent-select"); + const connectBtn = document.getElementById("connect-btn"); + const refreshBtn = document.getElementById("refresh-btn"); + const statusDot = document.getElementById("status-indicator"); + const statusText = document.getElementById("status-text"); + const chatArea = document.getElementById("chat-area"); + const chatForm = document.getElementById("chat-form"); + const chatInput = document.getElementById("chat-input"); + const sendBtn = document.getElementById("send-btn"); + const fileTree = document.getElementById("file-tree"); + const refreshFilesBtn = document.getElementById("refresh-files-btn"); + const welcomeMsg = document.getElementById("welcome-msg"); + + // ---- State ---- + let connected = false; + let agentConnected = false; + let streaming = false; + let currentRequestId = null; + let streamTextBuffer = ""; + let streamTextEl = null; + let typingEl = null; + let streamedTextShown = false; // Track if streaming text was displayed + + // ---- Helpers ---- + function setStatus(state, text) { + statusDot.className = "status-dot " + state; + statusText.textContent = text; + } + + function scrollToBottom() { + requestAnimationFrame(() => { + chatArea.scrollTop = chatArea.scrollHeight; + }); + } + + function hideWelcome() { + if (welcomeMsg) welcomeMsg.style.display = "none"; + } + + function renderMarkdown(text) { + return marked.parse(text || ""); + } + + // ---- File tree ---- + function getFileIcon(name) { + const ext = name.split(".").pop().toLowerCase(); + const icons = { + py: "\u{1F40D}", + js: "\u2B22", + ts: "\u2B22", + html: "\u25C7", + css: "\u25C8", + md: "\u25A3", + json: "\u25A2", + sh: "\u25B6", + txt: "\u25A1", + yml: "\u25C6", + yaml: "\u25C6", + toml: "\u25C6", + }; + return icons[ext] || "\u25AB"; + } + + function buildFileTree(files) { + if (!files || files.length === 0) { + fileTree.innerHTML = '
No workspace files
'; + return; + } + + // Group by directory + const dirs = {}; + const now = Date.now() / 1000; + + files.forEach((f) => { + const parts = f.path.split("/"); + const dir = parts.length > 1 ? parts.slice(0, -1).join("/") : "."; + if (!dirs[dir]) dirs[dir] = []; + dirs[dir].push(f); + }); + + let html = ""; + const sortedDirs = Object.keys(dirs).sort(); + + sortedDirs.forEach((dir) => { + if (dir !== ".") { + html += `
\u{1F4C1} ${dir}/
`; + } + dirs[dir] + .sort((a, b) => a.name.localeCompare(b.name)) + .forEach((f) => { + const isRecent = now - f.mtime < 30; + const cls = isRecent ? "file-item recent" : "file-item"; + html += `
+ ${getFileIcon(f.name)} + ${f.name} +
`; + }); + }); + + fileTree.innerHTML = html; + } + + // ---- Chat message rendering ---- + function addUserMessage(text) { + hideWelcome(); + const div = document.createElement("div"); + div.className = "chat-msg msg-user"; + div.innerHTML = `
You
${escapeHtml(text)}
`; + chatArea.appendChild(div); + scrollToBottom(); + } + + function addAgentMessage(text, agentName) { + hideWelcome(); + removeTypingIndicator(); + + const div = document.createElement("div"); + div.className = "chat-msg msg-agent"; + div.innerHTML = `
${escapeHtml(agentName || "Agent")}
${renderMarkdown(text)}
`; + chatArea.appendChild(div); + + // Highlight code blocks + div.querySelectorAll("pre code").forEach((block) => { + hljs.highlightElement(block); + }); + + scrollToBottom(); + } + + function addStreamEvent(chunk) { + hideWelcome(); + + const type = chunk.chunk_type; + + if (type === "init") { + // Show typing indicator + showTypingIndicator(); + return; + } + + if (type === "text") { + // Accumulate text into a streaming text element + streamTextBuffer += chunk.content; + streamedTextShown = true; + if (!streamTextEl) { + removeTypingIndicator(); + const div = document.createElement("div"); + div.className = "chat-msg msg-agent"; + div.innerHTML = `
Agent
`; + chatArea.appendChild(div); + streamTextEl = div.querySelector(".streaming-text"); + } + streamTextEl.innerHTML = renderMarkdown(streamTextBuffer); + streamTextEl.querySelectorAll("pre code").forEach((block) => { + hljs.highlightElement(block); + }); + scrollToBottom(); + return; + } + + if (type === "tool_start") { + removeTypingIndicator(); + let detail = ""; + try { + const meta = JSON.parse(chunk.metadata || "{}"); + if (meta.tool_input && Object.keys(meta.tool_input).length > 0) { + detail = ``; + } + } catch (e) { /* ignore */ } + + const div = document.createElement("div"); + div.className = "stream-event tool-start"; + div.innerHTML = ` + \u2699 +
+ ${escapeHtml(chunk.content)} + ${detail} +
`; + chatArea.appendChild(div); + showTypingIndicator(); + scrollToBottom(); + return; + } + + if (type === "tool_result") { + removeTypingIndicator(); + const resultText = chunk.content || ""; + let detail = ""; + if (resultText.length > 0) { + const truncated = resultText.length > 500 ? resultText.substring(0, 500) + "..." : resultText; + detail = ``; + } + + const div = document.createElement("div"); + div.className = "stream-event tool-result"; + div.innerHTML = ` + \u2713 +
+ Result + ${detail} +
`; + chatArea.appendChild(div); + showTypingIndicator(); + scrollToBottom(); + return; + } + + if (type === "error") { + removeTypingIndicator(); + const div = document.createElement("div"); + div.className = "stream-event error"; + div.innerHTML = `\u2717
${escapeHtml(chunk.content)}
`; + chatArea.appendChild(div); + scrollToBottom(); + return; + } + + if (type === "done") { + removeTypingIndicator(); + streaming = false; + // If no streaming text was shown, display the done content as a full message + if (chunk.content && chunk.content.length > 0 && !streamedTextShown) { + addAgentMessage(chunk.content, "Agent"); + } + // Reset streaming state (keep streamedTextShown until agent_response handles it) + streamTextBuffer = ""; + streamTextEl = null; + currentRequestId = null; + enableInput(); + return; + } + } + + function showTypingIndicator() { + if (typingEl) return; + typingEl = document.createElement("div"); + typingEl.className = "typing-indicator"; + typingEl.id = "typing-indicator"; + typingEl.innerHTML = `
Agent is working...`; + chatArea.appendChild(typingEl); + scrollToBottom(); + } + + function removeTypingIndicator() { + if (typingEl) { + typingEl.remove(); + typingEl = null; + } + } + + function addErrorMessage(text) { + hideWelcome(); + const div = document.createElement("div"); + div.className = "stream-event error"; + div.innerHTML = `\u2717
${escapeHtml(text)}
`; + chatArea.appendChild(div); + scrollToBottom(); + } + + function escapeHtml(text) { + const div = document.createElement("div"); + div.textContent = text; + return div.innerHTML; + } + + function enableInput() { + chatInput.disabled = false; + sendBtn.disabled = false; + chatInput.focus(); + } + + function disableInput() { + chatInput.disabled = true; + sendBtn.disabled = true; + } + + // ---- Socket.IO event handlers ---- + socket.on("connect", () => { + connected = true; + setStatus("online", "Connected"); + }); + + socket.on("disconnect", () => { + connected = false; + setStatus("offline", "Disconnected"); + }); + + socket.on("status", (data) => { + if (data.message === "Connected") { + setStatus("online", "Discovering..."); + } + }); + + socket.on("agents", (data) => { + const agents = data.agents || []; + agentSelect.innerHTML = ""; + if (agents.length === 0) { + agentSelect.innerHTML = + ''; + agentSelect.disabled = true; + connectBtn.disabled = true; + return; + } + agents.forEach((a) => { + const opt = document.createElement("option"); + opt.value = a.name; + opt.textContent = a.name; + agentSelect.appendChild(opt); + }); + agentSelect.disabled = false; + connectBtn.disabled = false; + setStatus("online", "Ready"); + }); + + socket.on("agent_connected", (data) => { + agentConnected = true; + setStatus("online", `Connected: ${data.agent_name}`); + enableInput(); + }); + + socket.on("message_sent", (data) => { + currentRequestId = data.request_id; + streaming = true; + streamedTextShown = false; + addUserMessage(data.message); + disableInput(); + showTypingIndicator(); + setStatus("busy", "Working..."); + }); + + socket.on("stream_event", (chunk) => { + if (!streaming && chunk.chunk_type !== "done") { + streaming = true; + } + addStreamEvent(chunk); + }); + + socket.on("agent_response", (data) => { + removeTypingIndicator(); + streaming = false; + // Only show final response if no streamed text was already displayed + if (!streamedTextShown && data.message) { + addAgentMessage(data.message, data.agent_name); + } + // Reset all streaming state + streamTextBuffer = ""; + streamTextEl = null; + streamedTextShown = false; + currentRequestId = null; + enableInput(); + setStatus("online", `Connected: ${data.agent_name || "Agent"}`); + }); + + socket.on("error", (data) => { + removeTypingIndicator(); + streaming = false; + streamTextBuffer = ""; + streamTextEl = null; + streamedTextShown = false; + currentRequestId = null; + addErrorMessage(data.message || "Unknown error"); + enableInput(); + if (agentConnected) { + setStatus("online", "Ready"); + } + }); + + socket.on("file_tree", (data) => { + buildFileTree(data.files); + }); + + // ---- UI event handlers ---- + connectBtn.addEventListener("click", () => { + const name = agentSelect.value; + if (name) { + setStatus("online", `Connecting to ${name}...`); + socket.emit("connect_to_agent", { agent_name: name }); + } + }); + + refreshBtn.addEventListener("click", () => { + socket.emit("refresh_agents"); + }); + + refreshFilesBtn.addEventListener("click", () => { + socket.emit("refresh_files"); + }); + + chatForm.addEventListener("submit", (ev) => { + ev.preventDefault(); + const msg = (chatInput.value || "").trim(); + if (!msg || !agentConnected) return; + socket.emit("send_message", { message: msg }); + chatInput.value = ""; + }); + + // ---- Draggable resize handles ---- + (function initResizeHandles() { + const sidebar = document.getElementById("sidebar"); + const graphPanel = document.getElementById("graph-panel"); + const handleLeft = document.getElementById("resize-left"); + const handleRight = document.getElementById("resize-right"); + + function startDrag(handle, getTarget, setWidth) { + let startX, startW; + + function onMouseDown(e) { + e.preventDefault(); + const target = getTarget(); + startX = e.clientX; + startW = target.getBoundingClientRect().width; + handle.classList.add("active"); + document.body.classList.add("resizing"); + document.addEventListener("mousemove", onMouseMove); + document.addEventListener("mouseup", onMouseUp); + } + + function onMouseMove(e) { + const dx = e.clientX - startX; + setWidth(startW, dx); + } + + function onMouseUp() { + handle.classList.remove("active"); + document.body.classList.remove("resizing"); + document.removeEventListener("mousemove", onMouseMove); + document.removeEventListener("mouseup", onMouseUp); + // Trigger resize so the 3D graph canvas updates to new panel size + window.dispatchEvent(new Event("resize")); + } + + handle.addEventListener("mousedown", onMouseDown); + } + + // Left handle: dragging right makes sidebar wider + startDrag( + handleLeft, + () => sidebar, + (startW, dx) => { + const w = Math.max(120, Math.min(500, startW + dx)); + sidebar.style.width = w + "px"; + } + ); + + // Right handle: dragging left makes graph panel wider + startDrag( + handleRight, + () => graphPanel, + (startW, dx) => { + const w = Math.max(120, Math.min(600, startW - dx)); + graphPanel.style.width = w + "px"; + } + ); + })(); +})(); diff --git a/examples/CodingAgent/static/styles.css b/examples/CodingAgent/static/styles.css new file mode 100644 index 0000000..75ffad9 --- /dev/null +++ b/examples/CodingAgent/static/styles.css @@ -0,0 +1,435 @@ +/* ============================================ + Genesis CodingAgent — Web Interface Styles + ============================================ */ + +:root { + --bg-primary: #0b0f16; + --bg-secondary: #0f1520; + --bg-tertiary: #141b28; + --bg-input: #0b1420; + --border: rgba(255, 255, 255, 0.08); + --border-focus: rgba(100, 180, 255, 0.3); + --text-primary: #e8eaed; + --text-secondary: #9aa0a8; + --text-muted: #6b7280; + --accent-blue: #4da6ff; + --accent-cyan: #22d3ee; + --accent-green: #34d399; + --accent-amber: #fbbf24; + --accent-red: #ef4444; + --accent-purple: #a78bfa; + --radius: 8px; + --radius-sm: 6px; + --font-mono: 'SF Mono', 'Fira Code', 'Cascadia Code', monospace; + --font-ui: system-ui, -apple-system, sans-serif; +} + +* { box-sizing: border-box; margin: 0; padding: 0; } + +html, body { + height: 100%; + background: var(--bg-primary); + color: var(--text-primary); + font: 13px/1.5 var(--font-ui); + overflow: hidden; +} + +/* ---- Header ---- */ +.header { + display: flex; + align-items: center; + justify-content: space-between; + height: 48px; + padding: 0 16px; + background: var(--bg-secondary); + border-bottom: 1px solid var(--border); + z-index: 10; +} + +.header-left { display: flex; align-items: center; gap: 8px; } +.logo { font-weight: 700; font-size: 15px; color: var(--accent-cyan); letter-spacing: 0.5px; } +.logo-sub { font-size: 13px; color: var(--text-secondary); } + +.header-center { display: flex; align-items: center; gap: 8px; } + +.agent-select { + padding: 4px 8px; + background: var(--bg-input); + color: var(--text-primary); + border: 1px solid var(--border); + border-radius: var(--radius-sm); + font-size: 12px; + min-width: 180px; + cursor: pointer; +} +.agent-select:focus { border-color: var(--border-focus); outline: none; } + +.header-right { display: flex; align-items: center; gap: 8px; } + +.status-dot { + width: 8px; + height: 8px; + border-radius: 50%; + display: inline-block; +} +.status-dot.online { background: var(--accent-green); box-shadow: 0 0 6px var(--accent-green); } +.status-dot.offline { background: var(--text-muted); } +.status-dot.busy { background: var(--accent-amber); box-shadow: 0 0 6px var(--accent-amber); animation: pulse 1.5s infinite; } +.status-text { font-size: 12px; color: var(--text-secondary); } + +@keyframes pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.5; } +} + +/* ---- Buttons ---- */ +.btn { + padding: 6px 12px; + font-size: 12px; + border-radius: var(--radius-sm); + border: 1px solid var(--border); + background: var(--bg-tertiary); + color: var(--text-primary); + cursor: pointer; + transition: background 0.15s; +} +.btn:hover:not(:disabled) { background: rgba(255,255,255,0.06); } +.btn:disabled { opacity: 0.4; cursor: default; } +.btn-sm { padding: 3px 10px; } +.btn-xs { padding: 2px 6px; font-size: 11px; } +.btn-ghost { background: transparent; border-color: transparent; } +.btn-ghost:hover:not(:disabled) { background: rgba(255,255,255,0.04); } +.btn-primary { + background: var(--accent-blue); + border-color: var(--accent-blue); + color: #000; + font-weight: 600; +} +.btn-primary:hover:not(:disabled) { background: #5cb3ff; } + +/* ---- Main layout ---- */ +.main { + display: flex; + height: calc(100vh - 48px); +} + +/* ---- Resize handles ---- */ +.resize-handle { + width: 5px; + cursor: col-resize; + background: transparent; + position: relative; + flex-shrink: 0; + z-index: 5; + transition: background 0.15s; +} +.resize-handle::after { + content: ""; + position: absolute; + top: 0; + bottom: 0; + left: 2px; + width: 1px; + background: var(--border); +} +.resize-handle:hover, +.resize-handle.active { + background: rgba(77, 166, 255, 0.15); +} +.resize-handle:hover::after, +.resize-handle.active::after { + background: var(--accent-blue); +} +body.resizing { cursor: col-resize; user-select: none; } +body.resizing iframe, +body.resizing canvas { pointer-events: none; } + +/* ---- Sidebar (file tree) ---- */ +.sidebar { + width: 220px; + min-width: 120px; + max-width: 500px; + background: var(--bg-secondary); + display: flex; + flex-direction: column; + overflow: hidden; + flex-shrink: 0; +} + +.sidebar-header { + display: flex; + align-items: center; + justify-content: space-between; + padding: 10px 12px 8px; +} + +.sidebar-title { + font-size: 10px; + font-weight: 700; + letter-spacing: 1.2px; + color: var(--text-muted); + text-transform: uppercase; +} + +.file-tree { + flex: 1; + overflow-y: auto; + padding: 0 8px 12px; + font-family: var(--font-mono); + font-size: 12px; +} + +.file-tree::-webkit-scrollbar { width: 4px; } +.file-tree::-webkit-scrollbar-thumb { background: rgba(255,255,255,0.1); border-radius: 2px; } + +.file-item { + display: flex; + align-items: center; + gap: 6px; + padding: 3px 6px; + border-radius: 4px; + color: var(--text-secondary); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; + cursor: default; +} +.file-item:hover { background: rgba(255,255,255,0.04); } +.file-item.recent { color: var(--accent-green); } +.file-icon { font-size: 11px; width: 16px; text-align: center; flex-shrink: 0; } +.file-name { overflow: hidden; text-overflow: ellipsis; } + +.dir-item { + display: flex; + align-items: center; + gap: 6px; + padding: 3px 6px; + color: var(--text-muted); + font-size: 11px; + margin-top: 6px; +} +.dir-item:first-child { margin-top: 0; } + +.empty-state { + padding: 20px; + text-align: center; + color: var(--text-muted); + font-size: 12px; +} + +/* ---- Conversation ---- */ +.conversation { + flex: 1; + display: flex; + flex-direction: column; + min-width: 200px; +} + +.chat-area { + flex: 1; + overflow-y: auto; + padding: 20px 24px; +} + +.chat-area::-webkit-scrollbar { width: 6px; } +.chat-area::-webkit-scrollbar-thumb { background: rgba(255,255,255,0.1); border-radius: 3px; } + +/* Welcome message */ +.welcome-msg { + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; + height: 100%; + text-align: center; + color: var(--text-muted); +} +.welcome-icon { font-size: 36px; color: var(--accent-cyan); margin-bottom: 12px; } +.welcome-title { font-size: 18px; font-weight: 600; color: var(--text-primary); margin-bottom: 6px; } +.welcome-sub { font-size: 13px; } + +/* Chat messages */ +.chat-msg { + margin-bottom: 16px; + animation: fadeIn 0.25s ease; +} +@keyframes fadeIn { from { opacity: 0; transform: translateY(4px); } to { opacity: 1; transform: translateY(0); } } + +.msg-user { + background: rgba(77, 166, 255, 0.08); + border: 1px solid rgba(77, 166, 255, 0.15); + border-radius: var(--radius); + padding: 10px 14px; + color: var(--text-primary); +} +.msg-user .msg-label { color: var(--accent-blue); font-weight: 600; font-size: 11px; margin-bottom: 4px; } + +.msg-agent { + padding: 10px 0; + color: var(--text-primary); +} +.msg-agent .msg-label { color: var(--accent-cyan); font-weight: 600; font-size: 11px; margin-bottom: 4px; } + +/* Markdown content in agent messages */ +.msg-content { line-height: 1.6; } +.msg-content p { margin: 0 0 8px; } +.msg-content p:last-child { margin-bottom: 0; } +.msg-content pre { + background: var(--bg-tertiary); + border: 1px solid var(--border); + border-radius: var(--radius-sm); + padding: 10px 12px; + overflow-x: auto; + margin: 8px 0; + font-family: var(--font-mono); + font-size: 12px; + line-height: 1.5; +} +.msg-content code { + font-family: var(--font-mono); + font-size: 12px; +} +.msg-content :not(pre) > code { + background: rgba(255,255,255,0.06); + padding: 1px 5px; + border-radius: 3px; +} +.msg-content ul, .msg-content ol { margin: 4px 0 8px 20px; } + +/* Streaming events */ +.stream-event { + display: flex; + align-items: flex-start; + gap: 8px; + padding: 6px 10px; + margin: 4px 0; + border-radius: var(--radius-sm); + font-size: 12px; + animation: fadeIn 0.2s ease; +} + +.stream-event.tool-start { + background: rgba(251, 191, 36, 0.06); + border-left: 2px solid var(--accent-amber); + color: var(--accent-amber); +} + +.stream-event.tool-result { + background: rgba(52, 211, 153, 0.06); + border-left: 2px solid var(--accent-green); + color: var(--accent-green); + font-family: var(--font-mono); +} + +.stream-event.error { + background: rgba(239, 68, 68, 0.08); + border-left: 2px solid var(--accent-red); + color: var(--accent-red); +} + +.stream-event .event-icon { + flex-shrink: 0; + width: 18px; + text-align: center; + font-size: 13px; +} + +.stream-event .event-content { + flex: 1; + min-width: 0; + overflow-wrap: break-word; +} + +.stream-event .event-detail { + margin-top: 4px; + padding: 4px 8px; + background: rgba(0, 0, 0, 0.2); + border-radius: 4px; + font-family: var(--font-mono); + font-size: 11px; + color: var(--text-secondary); + max-height: 120px; + overflow-y: auto; + white-space: pre-wrap; + cursor: pointer; +} +.event-detail.collapsed { max-height: 24px; overflow: hidden; } + +/* Typing indicator */ +.typing-indicator { + display: flex; + align-items: center; + gap: 6px; + padding: 8px 0; + color: var(--text-muted); + font-size: 12px; +} +.typing-dots { display: flex; gap: 3px; } +.typing-dots span { + width: 5px; height: 5px; border-radius: 50%; + background: var(--accent-cyan); + animation: typingPulse 1.4s infinite ease-in-out; +} +.typing-dots span:nth-child(2) { animation-delay: 0.2s; } +.typing-dots span:nth-child(3) { animation-delay: 0.4s; } + +@keyframes typingPulse { + 0%, 80%, 100% { opacity: 0.2; transform: scale(0.8); } + 40% { opacity: 1; transform: scale(1); } +} + +/* Input bar */ +.input-bar { + padding: 12px 24px 16px; + background: var(--bg-secondary); + border-top: 1px solid var(--border); +} + +.input-bar form { + display: flex; + gap: 8px; +} + +.input-bar input { + flex: 1; + padding: 10px 14px; + background: var(--bg-input); + color: var(--text-primary); + border: 1px solid var(--border); + border-radius: var(--radius); + font-size: 13px; + font-family: var(--font-ui); + outline: none; + transition: border-color 0.15s; +} +.input-bar input:focus { border-color: var(--border-focus); } +.input-bar input:disabled { opacity: 0.5; } +.input-bar input::placeholder { color: var(--text-muted); } + +/* ---- Graph panel ---- */ +.graph-panel { + width: 280px; + min-width: 120px; + max-width: 600px; + background: var(--bg-secondary); + display: flex; + flex-direction: column; + overflow: hidden; + flex-shrink: 0; +} + +.graph-header { + padding: 10px 12px 8px; +} + +.graph-container { + flex: 1; + position: relative; + background: var(--bg-primary); +} + +/* ---- Responsive ---- */ +@media (max-width: 900px) { + .sidebar { display: none; } + .graph-panel { display: none; } +} diff --git a/examples/CodingAgent/templates/index.html b/examples/CodingAgent/templates/index.html new file mode 100644 index 0000000..1cf2ae5 --- /dev/null +++ b/examples/CodingAgent/templates/index.html @@ -0,0 +1,96 @@ + + + + + + Genesis CodingAgent + + + + + + + + + + + +
+
+ + CodingAgent +
+
+ + + +
+
+ + Connecting... +
+
+ + +
+ + + + +
+ + +
+
+
+
+
Genesis CodingAgent
+
Connect to an agent and describe a coding task.
+
+
+
+
+ + +
+
+
+ + +
+ + + +
+ + + + + diff --git a/examples/CodingAgent/tests/test_agent_mock.py b/examples/CodingAgent/tests/test_agent_mock.py index 39daec4..b5c287e 100644 --- a/examples/CodingAgent/tests/test_agent_mock.py +++ b/examples/CodingAgent/tests/test_agent_mock.py @@ -206,6 +206,81 @@ async def run_tests(): check("empty: not timed out", to is False) +# ------------------------------------------------------------------ +# Conversation history tests (no DDS required) +# ------------------------------------------------------------------ + +def run_history_tests(): + """Test _build_prompt_with_history logic using a lightweight mock.""" + global passed + + print("\n--- Conversation history (Genesis memory) ---") + + from genesis_lib.memory import SimpleMemoryAdapter + + # Create a minimal object that mimics the relevant CodingGenesisAgent attrs + class FakeAgent: + def __init__(self, backend_name): + self.memory = SimpleMemoryAdapter() + self._sessions = {} + self._backend = type("B", (), {"name": backend_name})() + + # Import the method we want to test + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + from coding_genesis_agent import CodingGenesisAgent + + # --- Test 1: No history returns raw message --- + agent = FakeAgent("claude") + result = CodingGenesisAgent._build_prompt_with_history(agent, "hello", None) + check("history: no history returns raw message", result == "hello") + + # --- Test 2: History is prepended for Claude --- + agent = FakeAgent("claude") + agent.memory.store("Write a snake game", metadata={"role": "user"}) + agent.memory.store("I created snake.py", metadata={"role": "assistant"}) + result = CodingGenesisAgent._build_prompt_with_history(agent, "Add obstacles", "conv1") + check("history: contains prior user msg", "[User]: Write a snake game" in result) + check("history: contains prior assistant msg", "[Assistant]: I created snake.py" in result) + check("history: contains current request", "Add obstacles" in result) + check("history: current request at end", result.endswith("Add obstacles")) + + # --- Test 3: Codex with active session skips history --- + agent = FakeAgent("codex") + agent._sessions["conv1"] = "sess-123" + agent.memory.store("Prior message", metadata={"role": "user"}) + result = CodingGenesisAgent._build_prompt_with_history(agent, "Follow up", "conv1") + check("history: codex with session returns raw", result == "Follow up") + + # --- Test 4: Codex without session includes history --- + agent = FakeAgent("codex") + agent.memory.store("First request", metadata={"role": "user"}) + agent.memory.store("First response", metadata={"role": "assistant"}) + result = CodingGenesisAgent._build_prompt_with_history(agent, "Second request", "conv2") + check("history: codex no session includes history", "[User]: First request" in result) + check("history: codex no session has current", "Second request" in result) + + # --- Test 5: Long assistant responses are truncated --- + agent = FakeAgent("claude") + long_text = "x" * 5000 + agent.memory.store("question", metadata={"role": "user"}) + agent.memory.store(long_text, metadata={"role": "assistant"}) + result = CodingGenesisAgent._build_prompt_with_history(agent, "next", "c1") + check("history: long response truncated", "... (truncated)" in result) + check("history: truncated to ~2000 chars", len(result) < 3000) + + # --- Test 6: Multiple turns build up --- + agent = FakeAgent("claude") + for i in range(5): + agent.memory.store(f"Question {i}", metadata={"role": "user"}) + agent.memory.store(f"Answer {i}", metadata={"role": "assistant"}) + result = CodingGenesisAgent._build_prompt_with_history(agent, "Final question", "c1") + for i in range(5): + check(f"history: multi-turn has question {i}", f"Question {i}" in result) + check(f"history: multi-turn has answer {i}", f"Answer {i}" in result) + + +run_history_tests() + # ------------------------------------------------------------------ # Run # ------------------------------------------------------------------ diff --git a/examples/CodingAgent/tests/test_backends.py b/examples/CodingAgent/tests/test_backends.py index 5952d07..7664dca 100644 --- a/examples/CodingAgent/tests/test_backends.py +++ b/examples/CodingAgent/tests/test_backends.py @@ -248,10 +248,11 @@ def check(name, condition, detail=""): check("claude-cmd-basic: --permission-mode", "--permission-mode" in cmd) check("claude-cmd-basic: bypassPermissions", "bypassPermissions" in cmd) +# Claude intentionally omits --session-id (print mode doesn't release session locks) cmd = cb.build_command("Continue the refactor", session_id="sess-abc-123") -check("claude-cmd-session: --session-id", "--session-id" in cmd) -check("claude-cmd-session: value", "sess-abc-123" in cmd) +check("claude-cmd-session: no --session-id", "--session-id" not in cmd) check("claude-cmd-session: -p", "-p" in cmd) +check("claude-cmd-session: prompt present", "Continue the refactor" in cmd) check("claude-cmd-session: --output-format", "--output-format" in cmd) check("claude-cmd-session: --verbose", "--verbose" in cmd) @@ -260,8 +261,7 @@ def check(name, condition, detail=""): check("claude-cmd-cwd: no --cwd flag", "--cwd" not in cmd) cmd = cb.build_command("Continue", session_id="sess-xyz", cwd="/tmp/work") -sid_idx = cmd.index("--session-id") -check("claude-cmd-both: session follows flag", cmd[sid_idx + 1] == "sess-xyz") +check("claude-cmd-both: no --session-id", "--session-id" not in cmd) check("claude-cmd-both: no --cwd flag", "--cwd" not in cmd) of_idx = cmd.index("--output-format") check("claude-cmd-both: format follows flag", cmd[of_idx + 1] == "stream-json") diff --git a/examples/CodingAgent/web_server.py b/examples/CodingAgent/web_server.py new file mode 100644 index 0000000..c07eaa3 --- /dev/null +++ b/examples/CodingAgent/web_server.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +""" +CodingAgent Web Server — Flask/SocketIO interface with DDS streaming. + +Provides a polished web interface for the CodingGenesisAgent with: +- Real-time streaming of coding events (tool usage, text output) +- Workspace file tree (auto-refreshes after task completion) +- 3D Genesis network graph visualization +- Backend selector and connection status +""" + +import argparse +import asyncio +import logging +import os +import sys +import uuid +from datetime import datetime + +from flask import Flask, render_template, jsonify +from flask_socketio import SocketIO, emit + +# Ensure genesis_lib is importable +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from genesis_lib.graph_state import GraphService +from genesis_lib.web.graph_viewer import register_graph_viewer +from genesis_lib.monitored_interface import MonitoredInterface +from genesis_lib.stream_subscriber import StreamSubscriber + +import rti.connextdds as dds + +logger = logging.getLogger(__name__) + + +def scan_file_tree(root_dir): + """Scan a directory and return a JSON-serializable file tree.""" + if not root_dir or not os.path.isdir(root_dir): + return [] + + tree = [] + for dirpath, dirnames, filenames in os.walk(root_dir): + # Skip hidden directories + dirnames[:] = [d for d in dirnames if not d.startswith(".")] + rel = os.path.relpath(dirpath, root_dir) + if rel == ".": + rel = "" + for fname in sorted(filenames): + if fname.startswith("."): + continue + path = os.path.join(rel, fname) if rel else fname + try: + stat = os.stat(os.path.join(dirpath, fname)) + mtime = stat.st_mtime + size = stat.st_size + except OSError: + mtime = 0 + size = 0 + tree.append({ + "path": path, + "name": fname, + "size": size, + "mtime": mtime, + }) + return tree + + +def create_app(workspace_dir=None): + template_dir = os.path.join(os.path.dirname(__file__), "templates") + static_dir = os.path.join(os.path.dirname(__file__), "static") + + app = Flask(__name__, template_folder=template_dir, static_folder=static_dir) + app.config["SECRET_KEY"] = "coding_web_interface_secret" + socketio = SocketIO( + app, + async_mode="threading", + cors_allowed_origins="*", + logger=False, + engineio_logger=False, + ) + + domain_id = int(os.getenv("GENESIS_DOMAIN", "0")) + + # DDS graph service for 3D visualization + graph = GraphService(domain_id=domain_id) + graph.start() + register_graph_viewer(app, socketio, graph, url_prefix="/genesis-graph") + + # DDS participant for stream subscription + participant = dds.DomainParticipant(domain_id) + + # State + state = { + "interface": None, + "connected_agent": None, + "interface_lock": False, + "workspace_dir": workspace_dir, + "active_subscribers": {}, # request_id -> StreamSubscriber + "conversation_id": None, + } + + # ------- Routes ------- + + @app.route("/") + def index(): + return render_template("index.html") + + @app.route("/api/health") + def health(): + return jsonify({"status": "ok", "time": datetime.utcnow().isoformat()}) + + @app.route("/api/files") + def api_files(): + return jsonify({"files": scan_file_tree(state["workspace_dir"])}) + + # ------- SocketIO events ------- + + @socketio.on("connect") + def on_connect(): + emit("status", {"message": "Connected", "workspace": state["workspace_dir"] or ""}) + socketio.start_background_task(_initialize_interface_once_bg) + # Send initial file tree + if state["workspace_dir"]: + emit("file_tree", {"files": scan_file_tree(state["workspace_dir"])}) + + def _initialize_interface_once_bg(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_initialize_interface_once()) + finally: + loop.close() + + async def _initialize_interface_once(): + if state["interface_lock"] or state["interface"] is not None: + await _refresh_agents() + return + state["interface_lock"] = True + try: + if state["interface"] is None: + state["interface"] = MonitoredInterface( + "CodingWebInterface", "CodingWebInterface" + ) + logger.info("Created MonitoredInterface for web server") + finally: + state["interface_lock"] = False + await _refresh_agents() + + def _refresh_agents_bg(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_refresh_agents()) + finally: + loop.close() + + async def _refresh_agents(): + if state["interface"] is None: + return + await asyncio.sleep(2) + agents = [] + for agent_id, info in state["interface"].available_agents.items(): + agents.append({ + "id": agent_id, + "name": info.get("prefered_name", "Unknown"), + "type": info.get("agent_type", "Unknown"), + "service_name": info.get("service_name", ""), + "description": info.get("description", ""), + }) + socketio.emit("agents", {"agents": agents}) + + @socketio.on("refresh_agents") + def refresh_agents(): + socketio.start_background_task(_refresh_agents_bg) + + @socketio.on("connect_to_agent") + def connect_to_agent(data): + name = (data or {}).get("agent_name") + if not name: + emit("error", {"message": "agent_name required"}) + return + socketio.start_background_task(_connect_agent_bg, name) + + def _connect_agent_bg(agent_name): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_connect_agent(agent_name)) + finally: + loop.close() + + async def _connect_agent(agent_name): + if state["interface"] is None: + return + target = None + for _id, info in state["interface"].available_agents.items(): + if info.get("prefered_name") == agent_name: + target = info + break + if not target: + socketio.emit("error", {"message": f"Agent {agent_name} not found"}) + return + ok = await state["interface"].connect_to_agent(target.get("service_name")) + if ok: + state["connected_agent"] = agent_name + # Generate a new conversation_id for this connection + state["conversation_id"] = str(uuid.uuid4())[:8] + socketio.emit( + "agent_connected", + {"agent_name": agent_name, "agent_info": target}, + ) + else: + socketio.emit("error", {"message": f"Failed to connect to {agent_name}"}) + + @socketio.on("send_message") + def send_message(data): + msg = (data or {}).get("message") + if not msg: + emit("error", {"message": "message required"}) + return + socketio.start_background_task(_send_message_bg, msg) + + def _send_message_bg(message): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_send_message(message)) + except Exception as exc: + logger.error("_send_message_bg failed: %s", exc, exc_info=True) + try: + socketio.emit("error", {"message": f"Request failed: {exc}"}) + except Exception: + pass + finally: + loop.close() + + async def _send_message(message): + if state["interface"] is None or state["connected_agent"] is None: + socketio.emit("error", {"message": "No agent connected"}) + return + + request_id = str(uuid.uuid4()) + socketio.emit( + "message_sent", + { + "message": message, + "request_id": request_id, + "timestamp": datetime.utcnow().isoformat(), + }, + ) + + # Start stream subscriber BEFORE sending request to not miss early events + def on_stream_chunk(chunk): + socketio.emit("stream_event", chunk) + + sub = StreamSubscriber( + participant, on_stream_chunk, request_id_filter=request_id + ) + state["active_subscribers"][request_id] = sub + + try: + # Send RPC request with request_id so agent uses it for stream + resp = await state["interface"].send_request( + { + "message": message, + "conversation_id": state.get("conversation_id", ""), + "request_id": request_id, + }, + timeout_seconds=300.0, + ) + + if resp and resp.get("status") == 0: + socketio.emit( + "agent_response", + { + "message": resp.get("message", ""), + "request_id": request_id, + "timestamp": datetime.utcnow().isoformat(), + "agent_name": state["connected_agent"], + }, + ) + else: + socketio.emit( + "error", + { + "message": resp.get("message") if resp else "No response from agent", + "request_id": request_id, + }, + ) + except Exception as exc: + logger.error("Error in _send_message: %s", exc, exc_info=True) + socketio.emit( + "error", + { + "message": f"Request failed: {exc}", + "request_id": request_id, + }, + ) + finally: + # Clean up stream subscriber + sub.close() + state["active_subscribers"].pop(request_id, None) + + # Refresh file tree after task completion + if state["workspace_dir"]: + socketio.emit( + "file_tree", + {"files": scan_file_tree(state["workspace_dir"])}, + ) + + @socketio.on("refresh_files") + def refresh_files(): + if state["workspace_dir"]: + emit("file_tree", {"files": scan_file_tree(state["workspace_dir"])}) + + return app, socketio, graph + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="CodingAgent Web Server") + parser.add_argument( + "-p", "--port", type=int, default=5080, help="Port (default: 5080)" + ) + parser.add_argument( + "--host", default="0.0.0.0", help="Host (default: 0.0.0.0)" + ) + parser.add_argument( + "--workspace", + default=os.path.join(os.path.dirname(__file__), "workspace"), + help="Workspace directory to monitor", + ) + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + host = args.host + port = int(os.getenv("PORT", args.port)) + workspace = os.path.abspath(args.workspace) + + print(f"[CodingAgent Web] Starting on {host}:{port}") + print(f"[CodingAgent Web] Workspace: {workspace}") + + app, socketio, graph = create_app(workspace_dir=workspace) + try: + socketio.run(app, host=host, port=port, allow_unsafe_werkzeug=True) + finally: + graph.stop() diff --git a/genesis_lib/__init__.py b/genesis_lib/__init__.py index 4a57750..3766ede 100644 --- a/genesis_lib/__init__.py +++ b/genesis_lib/__init__.py @@ -51,6 +51,8 @@ from .utils.openai_utils import convert_functions_to_openai_schema, generate_response_with_functions from .utils.function_utils import call_function_thread_safe, find_function_by_name, filter_functions_by_relevance from .utils import get_datamodel_path, load_datamodel +from .stream_publisher import StreamPublisher +from .stream_subscriber import StreamSubscriber __all__ = [ 'GenesisApp', @@ -72,7 +74,9 @@ 'find_function_by_name', 'filter_functions_by_relevance', 'get_datamodel_path', - 'load_datamodel' + 'load_datamodel', + 'StreamPublisher', + 'StreamSubscriber', ] # Add LocalGenesisAgent to __all__ only if ollama dependency is available diff --git a/genesis_lib/config/datamodel.xml b/genesis_lib/config/datamodel.xml index 470d7dd..57857a5 100644 --- a/genesis_lib/config/datamodel.xml +++ b/genesis_lib/config/datamodel.xml @@ -843,6 +843,38 @@ + + + + + + + + + + diff --git a/genesis_lib/stream_publisher.py b/genesis_lib/stream_publisher.py new file mode 100644 index 0000000..696dedf --- /dev/null +++ b/genesis_lib/stream_publisher.py @@ -0,0 +1,77 @@ +""" +StreamPublisher — Publishes StreamChunk events to a DDS topic. + +Used by agents to broadcast real-time streaming events (tool usage, text output, +errors) alongside the existing RPC request/reply model. +""" + +import json +import logging +import time +import threading + +import rti.connextdds as dds +from genesis_lib.utils import get_datamodel_path + +logger = logging.getLogger(__name__) + +TOPIC_NAME = "rti/connext/genesis/streaming/StreamChunk" + +# Reuse topic instances per participant to avoid duplicates +_TOPIC_REGISTRY = {} + + +class StreamPublisher: + """Publishes StreamChunk events to a DDS topic.""" + + def __init__(self, participant): + config_path = get_datamodel_path() + provider = dds.QosProvider(config_path) + self._type = provider.type("genesis_lib", "StreamChunk") + + pid = id(participant) + key = (pid, TOPIC_NAME) + if key in _TOPIC_REGISTRY: + topic = _TOPIC_REGISTRY[key] + else: + topic = dds.DynamicData.Topic(participant, TOPIC_NAME, self._type) + _TOPIC_REGISTRY[key] = topic + + qos = dds.QosProvider.default.datawriter_qos + qos.durability.kind = dds.DurabilityKind.VOLATILE + qos.reliability.kind = dds.ReliabilityKind.RELIABLE + qos.history.kind = dds.HistoryKind.KEEP_ALL + + publisher = dds.Publisher(participant) + self._writer = dds.DynamicData.DataWriter( + pub=publisher, topic=topic, qos=qos, + ) + self._sequences = {} # request_id -> next sequence number + self._lock = threading.Lock() + + def publish(self, request_id, chunk_type, content="", metadata=""): + """Write a StreamChunk sample with auto-incrementing sequence.""" + with self._lock: + seq = self._sequences.get(request_id, 0) + self._sequences[request_id] = seq + 1 + + sample = dds.DynamicData(self._type) + sample["request_id"] = request_id + sample["sequence"] = seq + sample["chunk_type"] = chunk_type + sample["content"] = content + if isinstance(metadata, dict): + metadata = json.dumps(metadata) + sample["metadata"] = metadata or "" + sample["timestamp"] = int(time.time() * 1000) + + self._writer.write(sample) + logger.debug( + "StreamPublisher: published chunk request_id=%s seq=%d type=%s", + request_id, seq, chunk_type, + ) + + def close(self): + """Clean up sequence tracking.""" + with self._lock: + self._sequences.clear() diff --git a/genesis_lib/stream_subscriber.py b/genesis_lib/stream_subscriber.py new file mode 100644 index 0000000..9a4c4f7 --- /dev/null +++ b/genesis_lib/stream_subscriber.py @@ -0,0 +1,108 @@ +""" +StreamSubscriber — Subscribes to StreamChunk events from a DDS topic. + +Used by web servers and monitoring tools to receive real-time streaming events +published by agents during task execution. +""" + +import json +import logging +import threading +import time + +import rti.connextdds as dds +from genesis_lib.utils import get_datamodel_path +from genesis_lib.stream_publisher import TOPIC_NAME, _TOPIC_REGISTRY + +logger = logging.getLogger(__name__) + + +class StreamSubscriber: + """Subscribes to StreamChunk events, optionally filtered by request_id.""" + + def __init__(self, participant, callback, request_id_filter=None): + """ + Args: + participant: DDS DomainParticipant. + callback: Called with a dict for each received chunk: + {request_id, sequence, chunk_type, content, metadata, timestamp} + request_id_filter: If set, only receive chunks for this request_id. + """ + config_path = get_datamodel_path() + provider = dds.QosProvider(config_path) + self._type = provider.type("genesis_lib", "StreamChunk") + self._callback = callback + self._running = True + + pid = id(participant) + key = (pid, TOPIC_NAME) + if key in _TOPIC_REGISTRY: + topic = _TOPIC_REGISTRY[key] + else: + topic = dds.DynamicData.Topic(participant, TOPIC_NAME, self._type) + _TOPIC_REGISTRY[key] = topic + + qos = dds.QosProvider.default.datareader_qos + qos.durability.kind = dds.DurabilityKind.VOLATILE + qos.reliability.kind = dds.ReliabilityKind.RELIABLE + qos.history.kind = dds.HistoryKind.KEEP_ALL + + subscriber = dds.Subscriber(participant) + + if request_id_filter: + cft = dds.DynamicData.ContentFilteredTopic( + topic, + f"StreamChunkFilter_{request_id_filter[:8]}_{int(time.time()*1000)%100000}", + dds.Filter(f"request_id = '{request_id_filter}'"), + ) + self._reader = dds.DynamicData.DataReader(subscriber, cft, qos) + else: + self._reader = dds.DynamicData.DataReader(subscriber, topic, qos) + + # Poll for data in a background thread + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + + def _poll_loop(self): + """Background polling loop for incoming samples.""" + waitset = dds.WaitSet() + status_cond = dds.StatusCondition(self._reader) + status_cond.enabled_statuses = dds.StatusMask.DATA_AVAILABLE + waitset += status_cond + + while self._running: + try: + waitset.wait(dds.Duration.from_milliseconds(200)) + except dds.TimeoutError: + continue + except Exception: + if not self._running: + break + continue + + samples = self._reader.take() + for sample in samples: + if not sample.info.valid: + continue + chunk = { + "request_id": sample.data["request_id"], + "sequence": sample.data["sequence"], + "chunk_type": sample.data["chunk_type"], + "content": sample.data["content"], + "metadata": sample.data["metadata"], + "timestamp": sample.data["timestamp"], + } + try: + self._callback(chunk) + except Exception as exc: + logger.error("StreamSubscriber callback error: %s", exc) + + def close(self): + """Stop the polling thread and release DDS resources.""" + self._running = False + if self._thread.is_alive(): + self._thread.join(timeout=2.0) + try: + self._reader.close() + except Exception: + pass