-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
317 lines (260 loc) · 12.9 KB
/
main.py
File metadata and controls
317 lines (260 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import os
import uvicorn
import asyncio
import logging
from dotenv import load_dotenv
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from fastapi.staticfiles import StaticFiles
from k_agents.governor_messenger import GovernorMessenger
from rag.database import DatabaseManager
# ── Real-time log queue (feeds the /log-stream SSE endpoint) ──────────────────
log_queue: asyncio.Queue = None # initialised after the event loop is up
class _UILogHandler(logging.Handler):
"""Bridges Python logging records to the SSE log queue."""
def emit(self, record: logging.LogRecord):
if log_queue is None:
return
level = record.levelname # INFO / WARNING / ERROR
msg = self.format(record)
try:
log_queue.put_nowait({"level": level, "msg": msg})
except asyncio.QueueFull:
pass
# Simple logging setup
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
_ui_handler = _UILogHandler()
_ui_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
logging.getLogger().addHandler(_ui_handler)
# Load Secrets securely into runtime environment before anything else uses it
load_dotenv()
from mcp_integration.client_bridge import MCPSessionManager
from contextlib import asynccontextmanager
mcp_client = MCPSessionManager()
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""Handles application startup and shutdown logic."""
global log_queue
log_queue = asyncio.Queue(maxsize=500)
logging.info("Lifespan Startup: Booting synchronous MCP subprocess...")
await mcp_client.connect()
# Start the background poll loop (uses BACKGROUND_POLL_INTERVAL from .env)
asyncio.create_task(gov.start_poll_loop())
yield
logging.info("Lifespan Shutdown: Terminating MCP subprocess...")
await mcp_client.close()
app = FastAPI(title="RIS Ouroboros Squad", lifespan=lifespan)
# Singleton Governor for MVP Event Streaming
gov = GovernorMessenger(db_path="uroboros.db", mcp_client=mcp_client)
@app.get("/stream")
async def message_stream():
"""SSE endpoint for real-time agent message cards."""
return EventSourceResponse(gov.get_event_stream())
@app.get("/log-stream")
async def log_stream():
"""SSE endpoint for streaming backend logs to the UI terminal."""
import json
async def _gen():
# Send a boot message immediately
yield {"event": "message", "data": json.dumps({"level": "SYSTEM", "msg": "INFO: RIS log terminal connected."})}
while True:
entry = await log_queue.get()
yield {"event": "message", "data": json.dumps(entry)}
return EventSourceResponse(_gen())
@app.post("/analyze")
async def trigger_analysis(repo_path: str, background_tasks: BackgroundTasks):
"""Queues a repository analysis task."""
background_tasks.add_task(gov.start_analysis, repo_path)
return {"status": "Analysis queued", "repo": repo_path}
@app.get("/config")
async def get_config():
"""Returns server-side configuration metadata."""
return {
"auto_repo_path": getattr(app.state, "auto_repo_path", None)
}
class InstructRequest(BaseModel):
task_id: int
instruction: str
class ConfirmRequest(BaseModel):
task_id: int
accept: bool
@app.post("/instruct")
async def handle_instruction(req: InstructRequest, background_tasks: BackgroundTasks):
"""Receives human instruction and routes to active Analyst."""
background_tasks.add_task(gov.instruct_analyst, req.task_id, req.instruction)
return {"status": "queued"}
@app.post("/action")
async def handle_action(req: ConfirmRequest, background_tasks: BackgroundTasks):
"""Approves or rejects a staged filesystem modification."""
background_tasks.add_task(gov.confirm_staged_action, req.task_id, req.accept)
return {"status": "queued"}
@app.post("/tasks/{task_id}/toggle-poll")
async def toggle_task_poll(task_id: int):
"""Toggles the background polling loop for a specific task."""
result = await gov.toggle_poll_state(task_id)
return result
@app.delete("/tasks/{task_id}")
async def delete_task(task_id: int):
"""Permanently purges a task and all its associated data."""
result = await gov.delete_task(task_id)
return result
class DevAuthRequest(BaseModel):
passphrase: str
@app.post("/verify-dev")
async def verify_dev_mode(req: DevAuthRequest):
"""Authenticates the user for Advanced Developer Mode access."""
os_phrase = os.getenv("SECRET_PHRASE")
from utils.alerts import push
if req.passphrase and req.passphrase == os_phrase:
push("RIS: Dev Mode Unlocked — Human successfully authenticated for Advanced Developer Mode console.")
return {"status": "success", "token": "dev_session_authenticated"}
push("RIS: Dev Mode Attack — Incorrect passphrase entered trying to unlock SWE capabilities remotely.")
raise HTTPException(status_code=401, detail="Unauthorized HIL Access Attempt.")
@app.post("/instruct-swe")
async def handle_dev_instruction(req: InstructRequest, background_tasks: BackgroundTasks):
"""Dispatches a developer-level instruction to the analyst."""
background_tasks.add_task(gov.instruct_analyst, req.task_id, req.instruction, is_dev_mode=True)
return {"status": "dev command queued"}
@app.get("/instruct-swe-stream")
async def handle_dev_stream(task_id: int, instruction: str):
"""Provides a streaming SSE response for developer instructions."""
import json
async def _gen():
state = gov.active_analysts.get(task_id)
if not state:
yield {"event": "message", "data": json.dumps({"type": "error", "token": "No active session for task."})}
return
repo_url = state["repo_url"]
history = state["dev_history"]
history.append({"role": "user", "content": instruction})
from k_agents.analyst_agent import AnalystAgent
analyst = AnalystAgent(gov.db, mcp_client=gov.mcp_client)
try:
# Shift the heavy lifting to the AnalystAgent's internal tool-aware streaming loop
async for chunk in analyst.chat_stream(repo_url, history, is_dev_mode=True):
# Standardize events for the UI
if chunk["type"] == "token":
yield {"event": "message", "data": json.dumps({"type": "token", "token": chunk["content"]})}
elif chunk["type"] == "tool_start":
yield {"event": "message", "data": json.dumps({"type": "tool_start", "name": chunk["name"]})}
elif chunk["type"] == "tool_output":
yield {"event": "message", "data": json.dumps({"type": "tool_output", "name": chunk["name"], "output": chunk["content"]})}
elif chunk["type"] == "error":
yield {"event": "message", "data": json.dumps({"type": "error", "token": chunk["content"]})}
yield {"event": "message", "data": json.dumps({"type": "done"})}
except Exception as e:
logging.error(f"Dev Stream Error: {e}")
yield {"event": "message", "data": json.dumps({"type": "error", "token": str(e)})}
return EventSourceResponse(_gen())
# Mount the static directory to serve index.html by default at root
app.mount("/", StaticFiles(directory="static", html=True), name="static")
if __name__ == "__main__":
import typer
cli_app = typer.Typer(help="RIS - Database CLI & Server Start")
@cli_app.command()
def serve(host: str = "0.0.0.0", port: int = 8000):
"""Starts the local uvicorn FastAPI server offering SSE and Boots the MCP."""
uvicorn.run(app, host=host, port=port)
@cli_app.command()
def run_local(repo_path: str = "."):
"""Runs the whole loop synchronously via local CLI instead of background UI process."""
from rich.console import Console
from guardrails.repo_validator import validate_repo_target
console = Console()
console.print(f"\n[bold blue] Validating repository rules for:[/bold blue] {repo_path}")
# 1. Provide manual CLI users with immediate UX errors preventing run
validation = validate_repo_target(repo_path)
if not validation["is_valid"]:
console.print(f"[bold red] Governor Rejected Analysis:[/bold red] {validation['error']}")
raise typer.Exit(code=1)
console.print("[bold green] Constraints & File Limits Passed.[/bold green]\n")
# 2. Prompt for Interface Selection
use_ui = typer.confirm("Would you like to start the Web Dashboard to monitor and command the swarm visually?")
if use_ui:
console.print("\n[bold magenta] Booting up the RIS Director Console...[/bold magenta]")
console.print("[cyan]Opening http://localhost:8000 in your browser...[/cyan]")
console.print(f"Auto-deploying swarm for: [bold]{repo_path}[/bold]\n")
# Store the validated path so /config can serve it to the SPA
app.state.auto_repo_path = repo_path
import webbrowser
from threading import Timer
Timer(1.5, lambda: webbrowser.open("http://localhost:8000")).start()
uvicorn.run(app, host="0.0.0.0", port=8000)
return
# 3. Interactive Headless confirmation
confirm = typer.confirm(f"Proceed with headless CLI terminal execution for '{repo_path}'?")
if not confirm:
console.print("[yellow] Analysis aborted by user.[/yellow]")
raise typer.Exit()
console.print("\n[bold magenta] Launching Swarm Architecture...[/bold magenta]")
# 3. Synchronous execution
async def execute_headless():
print("[INFO] Spawning Subprocess for MCP Server (Local)...")
await mcp_client.connect()
try:
await gov.start_analysis(repo_path)
finally:
await mcp_client.close()
asyncio.run(execute_headless())
console.print("\n[bold green] Agent loop completed successfully.[/bold green]")
console.print("Run [cyan]uv run python main.py view-cards[/cyan] to read the finalized structural and security reports.")
@cli_app.command()
def list_tasks():
"""Displays all active and past tasks in a formatted table."""
from rich.table import Table
from rich.console import Console
console = Console()
db = DatabaseManager(db_path="uroboros.db")
with db._get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, repo_url, status, poll_paused, created_at FROM tasks ORDER BY id DESC")
rows = cursor.fetchall()
if not rows:
console.print("[yellow]No tasks found in database.[/yellow]")
return
table = Table(title="RIS Agent Swarm - Task Registry")
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Repository", style="magenta")
table.add_column("Status", style="green")
table.add_column("Poll State", style="yellow")
table.add_column("Started At", style="dim")
for row in rows:
tid, repo, status, paused, ts = row
poll_str = "[bold red]PAUSED[/bold red]" if paused else "[bold green]ACTIVE[/bold green]"
table.add_row(str(tid), repo, status, poll_str, ts)
console.print(table)
@cli_app.command()
def toggle_poll(task_id: int):
"""Toggle background polling for a specific task via CLI."""
async def _run():
res = await gov.toggle_poll_state(task_id)
print(f"Result: {res}")
asyncio.run(_run())
@cli_app.command()
def delete_task_records(task_id: int):
"""Permanently purge a task and its artifacts via CLI."""
confirm = typer.confirm(f"Are you sure you want to PERMANENTLY delete Task #{task_id} and all related records?")
if not confirm: return
async def _run():
res = await gov.delete_task(task_id)
print(f"Purge Result: {res}")
asyncio.run(_run())
@cli_app.command()
def view_cards(task_id: int = None):
"""Dump UI messages/cards. Optionally filter by Task ID."""
db = DatabaseManager(db_path="uroboros.db")
query = "SELECT agent_name, message_type, content, created_at FROM messages_cards"
params = []
if task_id:
query += " WHERE task_id = ?"
params.append(task_id)
query += " ORDER BY id ASC"
with db._get_conn() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
rows = cursor.fetchall()
print(f"\n--- UI MESSAGE CARDS DUMP (Task: {task_id or 'ALL'}) ---")
for row in rows:
print(f"[{row[3]}] {row[0]} ({row[1]}):\n{row[2]}\n")
cli_app()