diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index af082dc..2eb762d 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -1473,6 +1473,7 @@ def cmd_benchmark(args): # - ingest_logs, ingest_git_history, ingest_git_history_all_projects # - correlate_git_with_sessions, ingest_bus_events # - find_related_sessions (requires valid session_id) + # - upload_entries, get_sync_status (remote sync tools - modify DB or require client context) benchmarks = [] for tool_name, tool_func in tool_functions.items(): @@ -1498,6 +1499,186 @@ def cmd_benchmark(args): print(format_output(output, args.json)) +def cmd_push(args): + """Push local session data to a remote server with incremental sync.""" + import os + import urllib.request + from datetime import datetime + + from agent_session_analytics.ingest import find_log_files + + # Get remote URL from env var or args + remote_url = getattr(args, "url", None) or os.environ.get("AGENT_SESSION_ANALYTICS_URL") + if not remote_url: + print("Error: No remote URL specified.") + print("Set AGENT_SESSION_ANALYTICS_URL or use --url") + return + + # Normalize URL to point to MCP endpoint + if not remote_url.endswith("/mcp"): + remote_url = remote_url.rstrip("/") + "/mcp" + + def mcp_call(method_name: str, arguments: dict) -> dict | None: + """Make an MCP call and return parsed result.""" + mcp_request = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": method_name, "arguments": arguments}, + "id": 1, + } + try: + req = urllib.request.Request( + remote_url, + data=json.dumps(mcp_request).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read().decode("utf-8")) + if "result" in result: + content = result["result"].get("content", []) + if content and content[0].get("type") == "text": + return json.loads(content[0]["text"]) + elif "error" in result: + print(f"Error: {result['error']}") + except urllib.error.URLError as e: + print(f"Error connecting to {remote_url}: {e}") + except Exception as e: + print(f"Error in MCP call: {e}") + return None + + # Find local JSONL files + files = find_log_files(days=args.days, project_filter=args.project) + if not files: + print(f"No log files found in the last {args.days} days") + return + + if not args.json: + print(f"Found {len(files)} log files") + + # Read all entries and group by session_id + all_entries_by_session: dict[ + str, list[tuple[str, dict]] + ] = {} # session_id -> [(project_path, entry), ...] + local_parse_errors = 0 + for file_path in files: + project_path = file_path.parent.name + try: + with open(file_path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + session_id = entry.get("sessionId") + if session_id: + if session_id not in all_entries_by_session: + all_entries_by_session[session_id] = [] + all_entries_by_session[session_id].append((project_path, entry)) + except json.JSONDecodeError: + local_parse_errors += 1 + except Exception as e: + if not args.json: + print(f"Warning: Failed to read {file_path}: {e}") + + if not all_entries_by_session: + print("No entries found in log files") + return + + total_local_entries = sum(len(v) for v in all_entries_by_session.values()) + if not args.json: + print(f"Found {total_local_entries} entries across {len(all_entries_by_session)} sessions") + + # Get sync status from server (what it already has) + if not args.json: + print("Checking server sync status...") + + sync_status = mcp_call("get_sync_status", {"session_ids": list(all_entries_by_session.keys())}) + if sync_status is None: + return + + server_sessions = sync_status.get("sessions", {}) + + # Filter to only entries newer than server's latest per session + entries_to_send: list[tuple[str, dict]] = [] # [(project_path, entry), ...] + for session_id, entries in all_entries_by_session.items(): + server_latest = server_sessions.get(session_id) + if server_latest: + # Parse server timestamp and filter + server_ts = datetime.fromisoformat(server_latest.replace("Z", "+00:00")) + for project_path, entry in entries: + entry_ts_str = entry.get("timestamp") + if entry_ts_str: + try: + entry_ts = datetime.fromisoformat(entry_ts_str.replace("Z", "+00:00")) + if entry_ts > server_ts: + entries_to_send.append((project_path, entry)) + except ValueError: + entries_to_send.append((project_path, entry)) # Can't parse, send anyway + else: + # Server doesn't have this session, send all entries + entries_to_send.extend(entries) + + if not entries_to_send: + output = { + "status": "ok", + "message": "Already in sync", + "files_checked": len(files), + "entries_checked": total_local_entries, + "entries_sent": 0, + "events_added": 0, + "remote_url": remote_url, + } + print(format_output(output, args.json)) + return + + if not args.json: + print( + f"Sending {len(entries_to_send)} new entries (skipping {total_local_entries - len(entries_to_send)} already synced)" + ) + + # Group entries to send by project_path for batching + by_project: dict[str, list[dict]] = {} + for project_path, entry in entries_to_send: + if project_path not in by_project: + by_project[project_path] = [] + by_project[project_path].append(entry) + + total_added = 0 + total_skipped = 0 + total_errors = 0 + + # Upload in batches per project + batch_size = args.batch_size + for project_path, entries in by_project.items(): + for i in range(0, len(entries), batch_size): + batch = entries[i : i + batch_size] + result = mcp_call("upload_entries", {"entries": batch, "project_path": project_path}) + if result is None: + return + total_added += result.get("events_added", 0) + total_skipped += result.get("events_skipped", 0) + total_errors += result.get("parse_errors", 0) + + if not args.json: + print(f" {project_path}: {len(entries)} entries") + + output = { + "status": "ok", + "files_checked": len(files), + "entries_checked": total_local_entries, + "entries_sent": len(entries_to_send), + "events_added": total_added, + "events_skipped": total_skipped, + "local_parse_errors": local_parse_errors, + "remote_parse_errors": total_errors, + "remote_url": remote_url, + } + + print(format_output(output, args.json)) + + def main(): """CLI entry point.""" epilog = """ @@ -1834,6 +2015,19 @@ def main(): ) sub.set_defaults(func=cmd_benchmark) + # push (Issue #93 - remote ingestion) + sub = subparsers.add_parser("push", help="Push local session data to remote server") + sub.add_argument("--days", type=int, default=365, help="Days to look back (default: 365)") + sub.add_argument("--project", help="Project path filter") + sub.add_argument("--url", help="Remote server URL (or set AGENT_SESSION_ANALYTICS_URL)") + sub.add_argument( + "--batch-size", + type=int, + default=500, + help="Entries per batch (default: 500)", + ) + sub.set_defaults(func=cmd_push) + args = parser.parse_args() args.func(args) diff --git a/src/agent_session_analytics/guide.md b/src/agent_session_analytics/guide.md index f9e3b60..abe6bde 100644 --- a/src/agent_session_analytics/guide.md +++ b/src/agent_session_analytics/guide.md @@ -18,6 +18,26 @@ identify permission gaps. | `get_status()` | Database stats, last ingestion time | | `ingest_logs(days?, project?, force?)` | Refresh data from JSONL files | +### Remote Sync (Multi-Machine) + +For setups where the database lives on a central server (e.g., via Tailscale): + +| Tool | Purpose | +|------|---------| +| `get_sync_status(session_ids?)` | Get latest timestamp per session for incremental sync | +| `upload_entries(entries, project_path)` | Upload raw JSONL entries from remote clients | + +**CLI usage:** +```bash +# Set remote server URL +export AGENT_SESSION_ANALYTICS_URL=https://server.tailnet.ts.net/mcp + +# Push local session data (incremental - only sends new entries) +agent-session-analytics-cli push --days 365 +``` + +The `push` command queries `get_sync_status()` first to determine what the server already has, then only uploads entries newer than the server's latest per session. + ### Core Queries | Tool | Purpose | diff --git a/src/agent_session_analytics/server.py b/src/agent_session_analytics/server.py index d0ce76f..004c5bf 100644 --- a/src/agent_session_analytics/server.py +++ b/src/agent_session_analytics/server.py @@ -77,6 +77,74 @@ def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) } +@mcp.tool() +def get_sync_status(session_ids: list[str] | None = None) -> dict: + """Get latest event timestamp per session for incremental sync. + + Args: + session_ids: Optional list of session IDs to check (all if not specified) + """ + query = """ + SELECT session_id, MAX(timestamp) as latest_timestamp + FROM events + """ + params = [] + + if session_ids: + placeholders = ",".join("?" * len(session_ids)) + query += f" WHERE session_id IN ({placeholders})" + params = session_ids + + query += " GROUP BY session_id" + + rows = storage.execute_query(query, params) + + return { + "status": "ok", + "sessions": {row["session_id"]: row["latest_timestamp"] for row in rows}, + } + + +@mcp.tool() +def upload_entries(entries: list[dict], project_path: str) -> dict: + """Upload raw JSONL entries from a remote client. + + For multi-machine setups where session JSONL files live on client machines. + Entries are parsed server-side so future parser improvements apply. + + Args: + entries: List of raw JSONL entry dicts (as read from session files) + project_path: Project path identifier (typically the directory name) + """ + # Parse entries server-side using the same logic as local ingestion + all_events = [] + errors = 0 + + for raw in entries: + try: + parsed = ingest.parse_entry(raw, project_path) + all_events.extend(parsed) + except Exception as e: + logger.debug(f"Error parsing uploaded entry: {e}") + errors += 1 + + # Insert with deduplication (INSERT OR IGNORE on uuid) + events_added = storage.add_events_batch(all_events) if all_events else 0 + + # Update session statistics + sessions_updated = ingest.update_session_stats(storage) + + return { + "status": "ok", + "entries_received": len(entries), + "events_parsed": len(all_events), + "events_added": events_added, + "events_skipped": len(all_events) - events_added, + "sessions_updated": sessions_updated, + "parse_errors": errors, + } + + @mcp.tool() def get_tool_frequency(days: int = 7, project: str | None = None, expand: bool = True) -> dict: """Get tool usage frequency counts. diff --git a/tests/test_server.py b/tests/test_server.py index c9522fd..04b60de 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -24,6 +24,7 @@ get_session_messages, get_session_signals, get_status, + get_sync_status, get_token_usage, get_tool_frequency, get_tool_sequences, @@ -32,6 +33,7 @@ list_sessions, sample_sequences, search_messages, + upload_entries, ) @@ -401,6 +403,55 @@ def test_get_large_tool_results(): assert isinstance(result["large_results"], list) +# --- Remote Ingestion Tests (Issue #93) --- + + +def test_get_sync_status(): + """Test that get_sync_status returns latest timestamps per session.""" + result = get_sync_status.fn(session_ids=None) + assert result["status"] == "ok" + assert "sessions" in result + assert isinstance(result["sessions"], dict) + + +def test_get_sync_status_with_filter(): + """Test that get_sync_status filters by session_ids.""" + result = get_sync_status.fn(session_ids=["nonexistent-session"]) + assert result["status"] == "ok" + assert "sessions" in result + # Nonexistent session should return empty + assert "nonexistent-session" not in result["sessions"] + + +def test_upload_entries(): + """Test that upload_entries accepts and parses raw JSONL entries.""" + # Create a minimal valid entry + test_entries = [ + { + "type": "user", + "sessionId": "test-upload-session", + "timestamp": "2026-01-25T10:00:00Z", + "uuid": "test-upload-uuid-001", + "message": {"content": "test message"}, + } + ] + result = upload_entries.fn(entries=test_entries, project_path="test-project") + assert result["status"] == "ok" + assert "entries_received" in result + assert result["entries_received"] == 1 + assert "events_parsed" in result + assert "events_added" in result + assert "sessions_updated" in result + + +def test_upload_entries_empty(): + """Test that upload_entries handles empty list.""" + result = upload_entries.fn(entries=[], project_path="test-project") + assert result["status"] == "ok" + assert result["entries_received"] == 0 + assert result["events_parsed"] == 0 + + # --- Tailscale Auth Middleware Tests ---