From df70d56da48677a65d734ed6b6adaea05a86748f Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sun, 25 Jan 2026 10:12:21 +0000 Subject: [PATCH 1/3] feat: Add upload_entries MCP tool and push CLI command For multi-machine setups where session JSONL files live on client machines: - upload_entries: MCP tool that accepts raw JSONL entries and parses server-side - push: CLI command that reads local files and uploads to remote server Raw entries are sent (not pre-parsed) so future parser improvements apply to historical data. Deduplication via existing UUID logic. Usage: export AGENT_SESSION_ANALYTICS_URL=https://server.tailnet.ts.net/mcp agent-session-analytics-cli push --days 7 Part of #93 Co-Authored-By: Claude Opus 4.5 --- src/agent_session_analytics/cli.py | 131 ++++++++++++++++++++++++++ src/agent_session_analytics/server.py | 40 ++++++++ 2 files changed, 171 insertions(+) diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index af082dc..a66d7ed 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -1498,6 +1498,124 @@ def cmd_benchmark(args): print(format_output(output, args.json)) +def cmd_push(args): + """Push local session data to a remote server.""" + import os + import urllib.request + + from agent_session_analytics.ingest import find_log_files + + # Get remote URL from env var or args + remote_url = getattr(args, "url", None) or os.environ.get("AGENT_SESSION_ANALYTICS_URL") + if not remote_url: + print("Error: No remote URL specified.") + print("Set AGENT_SESSION_ANALYTICS_URL or use --url") + return + + # Normalize URL to point to MCP endpoint + if not remote_url.endswith("/mcp"): + remote_url = remote_url.rstrip("/") + "/mcp" + + # Find local JSONL files + files = find_log_files(days=args.days, project_filter=args.project) + if not files: + print(f"No log files found in the last {args.days} days") + return + + if not args.json: + print(f"Found {len(files)} log files to push") + + total_entries = 0 + total_added = 0 + total_skipped = 0 + total_errors = 0 + files_processed = 0 + + # Process each file separately (preserves project_path association) + for file_path in files: + project_path = file_path.parent.name + + # Read raw JSONL entries + entries = [] + try: + with open(file_path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + pass # Skip malformed lines + except Exception as e: + print(f"Warning: Failed to read {file_path}: {e}") + continue + + if not entries: + continue + + # Upload in batches + batch_size = args.batch_size + for i in range(0, len(entries), batch_size): + batch = entries[i : i + batch_size] + + # Build MCP request - send raw entries for server-side parsing + mcp_request = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "upload_entries", + "arguments": {"entries": batch, "project_path": project_path}, + }, + "id": f"{project_path}_{i}", + } + + try: + req = urllib.request.Request( + remote_url, + data=json.dumps(mcp_request).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read().decode("utf-8")) + if "result" in result: + content = result["result"].get("content", []) + if content and content[0].get("type") == "text": + data = json.loads(content[0]["text"]) + total_added += data.get("events_added", 0) + total_skipped += data.get("events_skipped", 0) + total_errors += data.get("parse_errors", 0) + elif "error" in result: + print(f"Error: {result['error']}") + return + + except urllib.error.URLError as e: + print(f"Error connecting to {remote_url}: {e}") + return + except Exception as e: + print(f"Error uploading batch: {e}") + return + + total_entries += len(entries) + files_processed += 1 + + if not args.json: + print(f" {project_path}: {len(entries)} entries") + + output = { + "status": "ok", + "files_processed": files_processed, + "entries_sent": total_entries, + "events_added": total_added, + "events_skipped": total_skipped, + "parse_errors": total_errors, + "remote_url": remote_url, + } + + print(format_output(output, args.json)) + + def main(): """CLI entry point.""" epilog = """ @@ -1834,6 +1952,19 @@ def main(): ) sub.set_defaults(func=cmd_benchmark) + # push (Issue #93 - remote ingestion) + sub = subparsers.add_parser("push", help="Push local session data to remote server") + sub.add_argument("--days", type=int, default=7, help="Days to look back (default: 7)") + sub.add_argument("--project", help="Project path filter") + sub.add_argument("--url", help="Remote server URL (or set AGENT_SESSION_ANALYTICS_URL)") + sub.add_argument( + "--batch-size", + type=int, + default=500, + help="Entries per batch (default: 500)", + ) + sub.set_defaults(func=cmd_push) + args = parser.parse_args() args.func(args) diff --git a/src/agent_session_analytics/server.py b/src/agent_session_analytics/server.py index d0ce76f..a8a0cf3 100644 --- a/src/agent_session_analytics/server.py +++ b/src/agent_session_analytics/server.py @@ -77,6 +77,46 @@ def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) } +@mcp.tool() +def upload_entries(entries: list[dict], project_path: str) -> dict: + """Upload raw JSONL entries from a remote client. + + For multi-machine setups where session JSONL files live on client machines. + Entries are parsed server-side so future parser improvements apply. + + Args: + entries: List of raw JSONL entry dicts (as read from session files) + project_path: Project path identifier (typically the directory name) + """ + # Parse entries server-side using the same logic as local ingestion + all_events = [] + errors = 0 + + for raw in entries: + try: + parsed = ingest.parse_entry(raw, project_path) + all_events.extend(parsed) + except Exception as e: + logger.debug(f"Error parsing uploaded entry: {e}") + errors += 1 + + # Insert with deduplication (INSERT OR IGNORE on uuid) + events_added = storage.add_events_batch(all_events) if all_events else 0 + + # Update session statistics + sessions_updated = ingest.update_session_stats(storage) + + return { + "status": "ok", + "entries_received": len(entries), + "events_parsed": len(all_events), + "events_added": events_added, + "events_skipped": len(all_events) - events_added, + "sessions_updated": sessions_updated, + "parse_errors": errors, + } + + @mcp.tool() def get_tool_frequency(days: int = 7, project: str | None = None, expand: bool = True) -> dict: """Get tool usage frequency counts. From 518325913548e59033cd8d45d7749508ba9cfc59 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sun, 25 Jan 2026 10:20:54 +0000 Subject: [PATCH 2/3] feat: Add incremental sync with get_sync_status - Add get_sync_status MCP tool to return latest timestamp per session - Client queries server before sending to find what's new - Only uploads entries newer than server's latest per session - Change default --days to 365 (incremental sync makes this cheap) Flow: 1. Client reads local files, groups by session_id 2. Calls get_sync_status(session_ids) to get server's latest 3. Filters to only entries after server's timestamp 4. Uploads only the delta Co-Authored-By: Claude Opus 4.5 --- src/agent_session_analytics/cli.py | 186 +++++++++++++++++--------- src/agent_session_analytics/server.py | 28 ++++ 2 files changed, 151 insertions(+), 63 deletions(-) diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index a66d7ed..f03d020 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -1499,9 +1499,10 @@ def cmd_benchmark(args): def cmd_push(args): - """Push local session data to a remote server.""" + """Push local session data to a remote server with incremental sync.""" import os import urllib.request + from datetime import datetime from agent_session_analytics.ingest import find_log_files @@ -1516,6 +1517,35 @@ def cmd_push(args): if not remote_url.endswith("/mcp"): remote_url = remote_url.rstrip("/") + "/mcp" + def mcp_call(method_name: str, arguments: dict) -> dict | None: + """Make an MCP call and return parsed result.""" + mcp_request = { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": method_name, "arguments": arguments}, + "id": 1, + } + try: + req = urllib.request.Request( + remote_url, + data=json.dumps(mcp_request).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read().decode("utf-8")) + if "result" in result: + content = result["result"].get("content", []) + if content and content[0].get("type") == "text": + return json.loads(content[0]["text"]) + elif "error" in result: + print(f"Error: {result['error']}") + except urllib.error.URLError as e: + print(f"Error connecting to {remote_url}: {e}") + except Exception as e: + print(f"Error in MCP call: {e}") + return None + # Find local JSONL files files = find_log_files(days=args.days, project_filter=args.project) if not files: @@ -1523,20 +1553,14 @@ def cmd_push(args): return if not args.json: - print(f"Found {len(files)} log files to push") + print(f"Found {len(files)} log files") - total_entries = 0 - total_added = 0 - total_skipped = 0 - total_errors = 0 - files_processed = 0 - - # Process each file separately (preserves project_path association) + # Read all entries and group by session_id + all_entries_by_session: dict[ + str, list[tuple[str, dict]] + ] = {} # session_id -> [(project_path, entry), ...] for file_path in files: project_path = file_path.parent.name - - # Read raw JSONL entries - entries = [] try: with open(file_path, encoding="utf-8") as f: for line in f: @@ -1544,69 +1568,105 @@ def cmd_push(args): if not line: continue try: - entries.append(json.loads(line)) + entry = json.loads(line) + session_id = entry.get("sessionId") + if session_id: + if session_id not in all_entries_by_session: + all_entries_by_session[session_id] = [] + all_entries_by_session[session_id].append((project_path, entry)) except json.JSONDecodeError: - pass # Skip malformed lines + pass except Exception as e: - print(f"Warning: Failed to read {file_path}: {e}") - continue + if not args.json: + print(f"Warning: Failed to read {file_path}: {e}") - if not entries: - continue + if not all_entries_by_session: + print("No entries found in log files") + return - # Upload in batches - batch_size = args.batch_size - for i in range(0, len(entries), batch_size): - batch = entries[i : i + batch_size] + total_local_entries = sum(len(v) for v in all_entries_by_session.values()) + if not args.json: + print(f"Found {total_local_entries} entries across {len(all_entries_by_session)} sessions") - # Build MCP request - send raw entries for server-side parsing - mcp_request = { - "jsonrpc": "2.0", - "method": "tools/call", - "params": { - "name": "upload_entries", - "arguments": {"entries": batch, "project_path": project_path}, - }, - "id": f"{project_path}_{i}", - } + # Get sync status from server (what it already has) + if not args.json: + print("Checking server sync status...") - try: - req = urllib.request.Request( - remote_url, - data=json.dumps(mcp_request).encode("utf-8"), - headers={"Content-Type": "application/json"}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=60) as resp: - result = json.loads(resp.read().decode("utf-8")) - if "result" in result: - content = result["result"].get("content", []) - if content and content[0].get("type") == "text": - data = json.loads(content[0]["text"]) - total_added += data.get("events_added", 0) - total_skipped += data.get("events_skipped", 0) - total_errors += data.get("parse_errors", 0) - elif "error" in result: - print(f"Error: {result['error']}") - return - - except urllib.error.URLError as e: - print(f"Error connecting to {remote_url}: {e}") - return - except Exception as e: - print(f"Error uploading batch: {e}") - return + sync_status = mcp_call("get_sync_status", {"session_ids": list(all_entries_by_session.keys())}) + if sync_status is None: + return - total_entries += len(entries) - files_processed += 1 + server_sessions = sync_status.get("sessions", {}) + + # Filter to only entries newer than server's latest per session + entries_to_send: list[tuple[str, dict]] = [] # [(project_path, entry), ...] + for session_id, entries in all_entries_by_session.items(): + server_latest = server_sessions.get(session_id) + if server_latest: + # Parse server timestamp and filter + server_ts = datetime.fromisoformat(server_latest.replace("Z", "+00:00")) + for project_path, entry in entries: + entry_ts_str = entry.get("timestamp") + if entry_ts_str: + try: + entry_ts = datetime.fromisoformat(entry_ts_str.replace("Z", "+00:00")) + if entry_ts > server_ts: + entries_to_send.append((project_path, entry)) + except ValueError: + entries_to_send.append((project_path, entry)) # Can't parse, send anyway + else: + # Server doesn't have this session, send all entries + entries_to_send.extend(entries) + + if not entries_to_send: + output = { + "status": "ok", + "message": "Already in sync", + "files_checked": len(files), + "entries_checked": total_local_entries, + "entries_sent": 0, + "events_added": 0, + "remote_url": remote_url, + } + print(format_output(output, args.json)) + return + + if not args.json: + print( + f"Sending {len(entries_to_send)} new entries (skipping {total_local_entries - len(entries_to_send)} already synced)" + ) + + # Group entries to send by project_path for batching + by_project: dict[str, list[dict]] = {} + for project_path, entry in entries_to_send: + if project_path not in by_project: + by_project[project_path] = [] + by_project[project_path].append(entry) + + total_added = 0 + total_skipped = 0 + total_errors = 0 + + # Upload in batches per project + batch_size = args.batch_size + for project_path, entries in by_project.items(): + for i in range(0, len(entries), batch_size): + batch = entries[i : i + batch_size] + result = mcp_call("upload_entries", {"entries": batch, "project_path": project_path}) + if result is None: + return + total_added += result.get("events_added", 0) + total_skipped += result.get("events_skipped", 0) + total_errors += result.get("parse_errors", 0) if not args.json: print(f" {project_path}: {len(entries)} entries") output = { "status": "ok", - "files_processed": files_processed, - "entries_sent": total_entries, + "files_checked": len(files), + "entries_checked": total_local_entries, + "entries_sent": len(entries_to_send), "events_added": total_added, "events_skipped": total_skipped, "parse_errors": total_errors, @@ -1954,7 +2014,7 @@ def main(): # push (Issue #93 - remote ingestion) sub = subparsers.add_parser("push", help="Push local session data to remote server") - sub.add_argument("--days", type=int, default=7, help="Days to look back (default: 7)") + sub.add_argument("--days", type=int, default=365, help="Days to look back (default: 365)") sub.add_argument("--project", help="Project path filter") sub.add_argument("--url", help="Remote server URL (or set AGENT_SESSION_ANALYTICS_URL)") sub.add_argument( diff --git a/src/agent_session_analytics/server.py b/src/agent_session_analytics/server.py index a8a0cf3..004c5bf 100644 --- a/src/agent_session_analytics/server.py +++ b/src/agent_session_analytics/server.py @@ -77,6 +77,34 @@ def ingest_logs(days: int = 7, project: str | None = None, force: bool = False) } +@mcp.tool() +def get_sync_status(session_ids: list[str] | None = None) -> dict: + """Get latest event timestamp per session for incremental sync. + + Args: + session_ids: Optional list of session IDs to check (all if not specified) + """ + query = """ + SELECT session_id, MAX(timestamp) as latest_timestamp + FROM events + """ + params = [] + + if session_ids: + placeholders = ",".join("?" * len(session_ids)) + query += f" WHERE session_id IN ({placeholders})" + params = session_ids + + query += " GROUP BY session_id" + + rows = storage.execute_query(query, params) + + return { + "status": "ok", + "sessions": {row["session_id"]: row["latest_timestamp"] for row in rows}, + } + + @mcp.tool() def upload_entries(entries: list[dict], project_path: str) -> dict: """Upload raw JSONL entries from a remote client. From a030fba7ad2c579dd7cb9f2f12cce1725ced7f66 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Sun, 25 Jan 2026 10:39:03 +0000 Subject: [PATCH 3/3] test: Add tests and docs for remote sync tools - Add tests for get_sync_status and upload_entries - Document remote sync tools in guide.md - Add local_parse_errors counter to push command - Note sync tools in benchmark skip list Addresses claude-review feedback on PR #96. Co-Authored-By: Claude Opus 4.5 --- src/agent_session_analytics/cli.py | 7 ++-- src/agent_session_analytics/guide.md | 20 +++++++++++ tests/test_server.py | 51 ++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index f03d020..2eb762d 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -1473,6 +1473,7 @@ def cmd_benchmark(args): # - ingest_logs, ingest_git_history, ingest_git_history_all_projects # - correlate_git_with_sessions, ingest_bus_events # - find_related_sessions (requires valid session_id) + # - upload_entries, get_sync_status (remote sync tools - modify DB or require client context) benchmarks = [] for tool_name, tool_func in tool_functions.items(): @@ -1559,6 +1560,7 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: all_entries_by_session: dict[ str, list[tuple[str, dict]] ] = {} # session_id -> [(project_path, entry), ...] + local_parse_errors = 0 for file_path in files: project_path = file_path.parent.name try: @@ -1575,7 +1577,7 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: all_entries_by_session[session_id] = [] all_entries_by_session[session_id].append((project_path, entry)) except json.JSONDecodeError: - pass + local_parse_errors += 1 except Exception as e: if not args.json: print(f"Warning: Failed to read {file_path}: {e}") @@ -1669,7 +1671,8 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: "entries_sent": len(entries_to_send), "events_added": total_added, "events_skipped": total_skipped, - "parse_errors": total_errors, + "local_parse_errors": local_parse_errors, + "remote_parse_errors": total_errors, "remote_url": remote_url, } diff --git a/src/agent_session_analytics/guide.md b/src/agent_session_analytics/guide.md index f9e3b60..abe6bde 100644 --- a/src/agent_session_analytics/guide.md +++ b/src/agent_session_analytics/guide.md @@ -18,6 +18,26 @@ identify permission gaps. | `get_status()` | Database stats, last ingestion time | | `ingest_logs(days?, project?, force?)` | Refresh data from JSONL files | +### Remote Sync (Multi-Machine) + +For setups where the database lives on a central server (e.g., via Tailscale): + +| Tool | Purpose | +|------|---------| +| `get_sync_status(session_ids?)` | Get latest timestamp per session for incremental sync | +| `upload_entries(entries, project_path)` | Upload raw JSONL entries from remote clients | + +**CLI usage:** +```bash +# Set remote server URL +export AGENT_SESSION_ANALYTICS_URL=https://server.tailnet.ts.net/mcp + +# Push local session data (incremental - only sends new entries) +agent-session-analytics-cli push --days 365 +``` + +The `push` command queries `get_sync_status()` first to determine what the server already has, then only uploads entries newer than the server's latest per session. + ### Core Queries | Tool | Purpose | diff --git a/tests/test_server.py b/tests/test_server.py index c9522fd..04b60de 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -24,6 +24,7 @@ get_session_messages, get_session_signals, get_status, + get_sync_status, get_token_usage, get_tool_frequency, get_tool_sequences, @@ -32,6 +33,7 @@ list_sessions, sample_sequences, search_messages, + upload_entries, ) @@ -401,6 +403,55 @@ def test_get_large_tool_results(): assert isinstance(result["large_results"], list) +# --- Remote Ingestion Tests (Issue #93) --- + + +def test_get_sync_status(): + """Test that get_sync_status returns latest timestamps per session.""" + result = get_sync_status.fn(session_ids=None) + assert result["status"] == "ok" + assert "sessions" in result + assert isinstance(result["sessions"], dict) + + +def test_get_sync_status_with_filter(): + """Test that get_sync_status filters by session_ids.""" + result = get_sync_status.fn(session_ids=["nonexistent-session"]) + assert result["status"] == "ok" + assert "sessions" in result + # Nonexistent session should return empty + assert "nonexistent-session" not in result["sessions"] + + +def test_upload_entries(): + """Test that upload_entries accepts and parses raw JSONL entries.""" + # Create a minimal valid entry + test_entries = [ + { + "type": "user", + "sessionId": "test-upload-session", + "timestamp": "2026-01-25T10:00:00Z", + "uuid": "test-upload-uuid-001", + "message": {"content": "test message"}, + } + ] + result = upload_entries.fn(entries=test_entries, project_path="test-project") + assert result["status"] == "ok" + assert "entries_received" in result + assert result["entries_received"] == 1 + assert "events_parsed" in result + assert "events_added" in result + assert "sessions_updated" in result + + +def test_upload_entries_empty(): + """Test that upload_entries handles empty list.""" + result = upload_entries.fn(entries=[], project_path="test-project") + assert result["status"] == "ok" + assert result["entries_received"] == 0 + assert result["events_parsed"] == 0 + + # --- Tailscale Auth Middleware Tests ---