From 3040a06158b8676dff925acf3f535bad49acad42 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Mon, 26 Jan 2026 23:26:36 -0800 Subject: [PATCH 1/3] feat: Store raw JSONL entries for future re-parsing - Add raw_entries table to store unparsed JSONL entries - upload_entries now stores both raw and parsed data - Add --force flag to push command for re-sending all data - Add docs/TAILSCALE_SETUP.md for deployment guide Raw entries enable re-parsing historical data when the parser improves, without losing the original source material. Co-Authored-By: Claude Opus 4.5 --- docs/TAILSCALE_SETUP.md | 196 +++++++++++++++++++++++++ src/agent_session_analytics/cli.py | 90 +++++++----- src/agent_session_analytics/guide.md | 5 + src/agent_session_analytics/server.py | 14 ++ src/agent_session_analytics/storage.py | 72 +++++++++ 5 files changed, 342 insertions(+), 35 deletions(-) create mode 100644 docs/TAILSCALE_SETUP.md diff --git a/docs/TAILSCALE_SETUP.md b/docs/TAILSCALE_SETUP.md new file mode 100644 index 0000000..4915a34 --- /dev/null +++ b/docs/TAILSCALE_SETUP.md @@ -0,0 +1,196 @@ +# Tailscale Setup for Agent Session Analytics + +Deploy agent-session-analytics across multiple machines using Tailscale for secure, authenticated access. + +## Architecture + +``` +[Client Machine] [Server (speck-vm)] +~/.claude/projects/*.jsonl agent-session-analytics MCP + | | + CLI `push` command ----HTTPS----> tailscale serve (TLS + auth) + | | + Reads local JSONL Writes to SQLite + Incremental sync Dedupes by UUID +``` + +- Server runs on `localhost:8081` (unexposed) +- `tailscale serve` proxies HTTPS requests with TLS and identity headers +- Localhost connections are trusted; remote requires Tailscale auth + +## Server Setup + +### 1. Install the server + +```bash +cd ~/Documents/projects/agent-session-analytics +make install-server +``` + +This installs: +- Python dependencies via uv +- systemd user service (`agent-session-analytics.service`) +- MCP config pointing to localhost + +### 2. Configure Tailscale serve + +```bash +# Path-based routing (recommended for multiple services) +tailscale serve --bg --https=443 /agent-session-analytics/mcp localhost:8081 + +# Verify +tailscale serve status +``` + +### 3. Verify the server + +```bash +# Check service status +systemctl --user status agent-session-analytics + +# Test MCP endpoint (from server) +curl -s localhost:8081/mcp -H 'Content-Type: application/json' \ + -H 'Accept: application/json, text/event-stream' \ + -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":1}' +``` + +## Client Setup + +### 1. Install the client + +```bash +cd ~/Documents/projects/agent-session-analytics +make install-client REMOTE_URL=https://speck-vm.tailac7b3c.ts.net/agent-session-analytics/mcp +``` + +This configures Claude Code's MCP settings to point to the remote server. + +### 2. Configure push command + +Add to your shell profile (`.zshrc` or `.bashrc`): + +```bash +export AGENT_SESSION_ANALYTICS_URL=https://speck-vm.tailac7b3c.ts.net/agent-session-analytics/mcp +``` + +### 3. Push local data + +```bash +# Push last 7 days +agent-session-analytics-cli push --days 7 + +# Push all historical data (incremental, safe to re-run) +agent-session-analytics-cli push --days 365 +``` + +## Incremental Sync + +The push command uses incremental sync: + +1. Client queries `get_sync_status` to get latest timestamp per session +2. Only entries newer than server's latest are sent +3. Server deduplicates by UUID (`INSERT OR IGNORE`) + +This makes `push` safe and efficient to run repeatedly. + +## Automatic Sync + +### Option 1: Hook after compaction + +Add to `~/.claude/settings.json`: + +```json +{ + "hooks": { + "SessionStart:compact": [ + { + "type": "command", + "command": "agent-session-analytics-cli push --days 1" + } + ] + } +} +``` + +### Option 2: Periodic sync via launchd (macOS) + +Create `~/Library/LaunchAgents/com.agent-session-analytics.push.plist`: + +```xml + + + + + Label + com.agent-session-analytics.push + ProgramArguments + + /usr/bin/env + agent-session-analytics-cli + push + --days + 1 + + EnvironmentVariables + + AGENT_SESSION_ANALYTICS_URL + https://speck-vm.tailac7b3c.ts.net/agent-session-analytics/mcp + + StartInterval + 3600 + RunAtLoad + + + +``` + +Load with: `launchctl load ~/Library/LaunchAgents/com.agent-session-analytics.push.plist` + +## Troubleshooting + +### 401 Unauthorized + +Remote requests must go through `tailscale serve`. Direct access to the port is blocked. + +```bash +# Wrong (direct access) +curl https://speck-vm:8081/mcp + +# Correct (through Tailscale) +curl https://speck-vm.tailac7b3c.ts.net/agent-session-analytics/mcp +``` + +### 406 Not Acceptable + +MCP requires specific Accept header: + +```bash +curl -H 'Accept: application/json, text/event-stream' ... +``` + +### Connection timeout during push + +Try smaller batch sizes: + +```bash +agent-session-analytics-cli push --days 7 --batch-size 50 +``` + +### Check server logs + +```bash +journalctl --user -u agent-session-analytics -f +``` + +## MCP Tools for Remote Sync + +| Tool | Purpose | +|------|---------| +| `get_sync_status(session_ids?)` | Get latest timestamp per session | +| `upload_entries(entries, project_path)` | Upload raw JSONL entries | +| `finalize_sync()` | Update session stats after upload | + +## Reference + +- [agent-event-bus Tailscale setup](https://github.com/evansenter/agent-event-bus/blob/main/docs/TAILSCALE_SETUP.md) +- [Tailscale serve documentation](https://tailscale.com/kb/1242/tailscale-serve) diff --git a/src/agent_session_analytics/cli.py b/src/agent_session_analytics/cli.py index baa6282..bf233ae 100644 --- a/src/agent_session_analytics/cli.py +++ b/src/agent_session_analytics/cli.py @@ -1602,42 +1602,54 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: if not args.json: print(f"Found {total_local_entries} entries across {len(all_entries_by_session)} sessions") - # Get sync status from server (what it already has) - if not args.json: - print("Checking server sync status...") - - sync_status = mcp_call("get_sync_status", {"session_ids": list(all_entries_by_session.keys())}) - if sync_status is None: - return - - server_sessions = sync_status.get("sessions", {}) - - # Filter to only entries newer than server's latest per session - entries_to_send: list[tuple[str, dict]] = [] # [(project_path, entry), ...] - for session_id, entries in all_entries_by_session.items(): - server_latest = server_sessions.get(session_id) - if server_latest: - # Parse server timestamp and filter - from datetime import timezone - - server_ts = datetime.fromisoformat(server_latest.replace("Z", "+00:00")) - if server_ts.tzinfo is None: - server_ts = server_ts.replace(tzinfo=timezone.utc) - for project_path, entry in entries: - entry_ts_str = entry.get("timestamp") - if entry_ts_str: - try: - entry_ts = datetime.fromisoformat(entry_ts_str.replace("Z", "+00:00")) - # Ensure both are timezone-aware for comparison - if entry_ts.tzinfo is None: - entry_ts = entry_ts.replace(tzinfo=timezone.utc) - if entry_ts > server_ts: - entries_to_send.append((project_path, entry)) - except ValueError: - entries_to_send.append((project_path, entry)) # Can't parse, send anyway - else: - # Server doesn't have this session, send all entries + # Get sync status from server (what it already has) - skip if --force + if args.force: + if not args.json: + print("Force mode: sending all entries (skipping incremental sync)") + # Send all entries + entries_to_send: list[tuple[str, dict]] = [] + for entries in all_entries_by_session.values(): entries_to_send.extend(entries) + else: + if not args.json: + print("Checking server sync status...") + + sync_status = mcp_call( + "get_sync_status", {"session_ids": list(all_entries_by_session.keys())} + ) + if sync_status is None: + return + + server_sessions = sync_status.get("sessions", {}) + + # Filter to only entries newer than server's latest per session + entries_to_send = [] + for session_id, entries in all_entries_by_session.items(): + server_latest = server_sessions.get(session_id) + if server_latest: + # Parse server timestamp and filter + from datetime import timezone + + server_ts = datetime.fromisoformat(server_latest.replace("Z", "+00:00")) + if server_ts.tzinfo is None: + server_ts = server_ts.replace(tzinfo=timezone.utc) + for project_path, entry in entries: + entry_ts_str = entry.get("timestamp") + if entry_ts_str: + try: + entry_ts = datetime.fromisoformat(entry_ts_str.replace("Z", "+00:00")) + # Ensure both are timezone-aware for comparison + if entry_ts.tzinfo is None: + entry_ts = entry_ts.replace(tzinfo=timezone.utc) + if entry_ts > server_ts: + entries_to_send.append((project_path, entry)) + except ValueError: + entries_to_send.append( + (project_path, entry) + ) # Can't parse, send anyway + else: + # Server doesn't have this session, send all entries + entries_to_send.extend(entries) if not entries_to_send: output = { @@ -1667,6 +1679,7 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: total_added = 0 total_skipped = 0 total_errors = 0 + total_raw_added = 0 entries_uploaded = 0 total_entries = len(entries_to_send) start_time = datetime.now() @@ -1684,6 +1697,7 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: total_added += result.get("events_added", 0) total_skipped += result.get("events_skipped", 0) total_errors += result.get("parse_errors", 0) + total_raw_added += result.get("raw_entries_added", 0) entries_uploaded += len(batch) # Progress update with time estimate @@ -1710,6 +1724,7 @@ def mcp_call(method_name: str, arguments: dict) -> dict | None: "files_checked": len(files), "entries_checked": total_local_entries, "entries_sent": len(entries_to_send), + "raw_entries_added": total_raw_added, "events_added": total_added, "events_skipped": total_skipped, "sessions_updated": sessions_updated, @@ -2068,6 +2083,11 @@ def main(): default=500, help="Entries per batch (default: 500)", ) + sub.add_argument( + "--force", + action="store_true", + help="Force re-send all entries (skip incremental sync, re-populate raw_entries)", + ) sub.set_defaults(func=cmd_push) args = parser.parse_args() diff --git a/src/agent_session_analytics/guide.md b/src/agent_session_analytics/guide.md index 6a2a831..9d988b7 100644 --- a/src/agent_session_analytics/guide.md +++ b/src/agent_session_analytics/guide.md @@ -35,10 +35,15 @@ export AGENT_SESSION_ANALYTICS_URL=https://server.tailnet.ts.net/mcp # Push local session data (incremental - only sends new entries) agent-session-analytics-cli push --days 365 + +# Force re-send all entries (re-populates raw_entries table) +agent-session-analytics-cli push --days 365 --force ``` The `push` command queries `get_sync_status()` first to determine what the server already has, then only uploads entries newer than the server's latest per session. +**Raw entry storage:** All uploaded entries are stored in both parsed form (events table) and raw form (raw_entries table). This allows re-parsing historical data when the parser improves. + ### Core Queries | Tool | Purpose | diff --git a/src/agent_session_analytics/server.py b/src/agent_session_analytics/server.py index 5896d51..4156b3f 100644 --- a/src/agent_session_analytics/server.py +++ b/src/agent_session_analytics/server.py @@ -111,12 +111,25 @@ def upload_entries(entries: list[dict], project_path: str, update_stats: bool = For multi-machine setups where session JSONL files live on client machines. Entries are parsed server-side so future parser improvements apply. + Raw entries are also stored for future re-parsing. Args: entries: List of raw JSONL entry dicts (as read from session files) project_path: Project path identifier (typically the directory name) update_stats: Update session stats after insert (default: False, call finalize_sync at end) """ + import json + + # Store raw entries for future re-parsing (always, even if parsed event exists) + raw_tuples = [] + for raw in entries: + session_id = raw.get("sessionId") + timestamp = raw.get("timestamp") + if session_id and timestamp: + raw_tuples.append((session_id, project_path, timestamp, json.dumps(raw))) + + raw_entries_added = storage.add_raw_entries_batch(raw_tuples) if raw_tuples else 0 + # Parse entries server-side using the same logic as local ingestion all_events = [] errors = 0 @@ -140,6 +153,7 @@ def upload_entries(entries: list[dict], project_path: str, update_stats: bool = return { "status": "ok", "entries_received": len(entries), + "raw_entries_added": raw_entries_added, "events_parsed": len(all_events), "events_added": events_added, "events_skipped": len(all_events) - events_added, diff --git a/src/agent_session_analytics/storage.py b/src/agent_session_analytics/storage.py index c43fdc4..5623e03 100644 --- a/src/agent_session_analytics/storage.py +++ b/src/agent_session_analytics/storage.py @@ -591,6 +591,32 @@ def migrate_v12(conn): logger.info(f"Fixed {warmup_count} warmup events from is_error=1 to is_error=0") +@migration(13, "add_raw_entries_table") +def migrate_v13(conn): + """Add raw_entries table for storing unparsed JSONL entries. + + This enables re-parsing historical data when the parser improves. + Raw entries are stored separately from parsed events to keep concerns separate + and allow full re-ingestion without data loss. + """ + conn.execute( + """ + CREATE TABLE IF NOT EXISTS raw_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + project_path TEXT, + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, + ingested_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(session_id, timestamp, entry_json) + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_raw_entries_session ON raw_entries(session_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_raw_entries_timestamp ON raw_entries(timestamp)") + logger.info("Created raw_entries table for storing unparsed JSONL") + + class SQLiteStorage: """SQLite-backed storage for session analytics.""" @@ -883,6 +909,27 @@ def _init_db(self): ) conn.execute("CREATE INDEX IF NOT EXISTS idx_bus_events_repo ON bus_events(repo)") + # Raw entries table for storing unparsed JSONL (v13) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS raw_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + project_path TEXT, + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, + ingested_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(session_id, timestamp, entry_json) + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_entries_session ON raw_entries(session_id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_raw_entries_timestamp ON raw_entries(timestamp)" + ) + # Run migrations AFTER all tables are created # Only existing databases need migrations - fresh databases have full schema current_version = self._get_schema_version(conn) @@ -1048,6 +1095,31 @@ def add_events_batch(self, events: list[Event]) -> int: ) return cursor.rowcount + def add_raw_entries_batch(self, entries: list[tuple[str, str, str, str]]) -> int: + """Add raw JSONL entries for future re-parsing. + + Args: + entries: List of (session_id, project_path, timestamp, entry_json) tuples + + Returns: + Number of entries added (duplicates are ignored) + """ + with self._connect() as conn: + cursor = conn.executemany( + """ + INSERT OR IGNORE INTO raw_entries (session_id, project_path, timestamp, entry_json) + VALUES (?, ?, ?, ?) + """, + entries, + ) + return cursor.rowcount + + def get_raw_entry_count(self) -> int: + """Get total number of raw entries.""" + with self._connect() as conn: + row = conn.execute("SELECT COUNT(*) as count FROM raw_entries").fetchone() + return row["count"] + def get_event_count(self) -> int: """Get total number of events.""" with self._connect() as conn: From cab10ba7dfd685277a1011646ac3d0c4d30182bc Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Mon, 26 Jan 2026 23:34:11 -0800 Subject: [PATCH 2/3] fix: Address PR review feedback - Update SCHEMA_VERSION from 12 to 13 (critical: migration wouldn't run) - Add raw_entries table documentation to SCHEMA.md - Add tests for add_raw_entries_batch and get_raw_entry_count Co-Authored-By: Claude Opus 4.5 --- docs/SCHEMA.md | 22 +++++++++++++++ src/agent_session_analytics/storage.py | 2 +- tests/test_storage.py | 38 ++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/docs/SCHEMA.md b/docs/SCHEMA.md index f55fab3..b5bfea9 100644 --- a/docs/SCHEMA.md +++ b/docs/SCHEMA.md @@ -28,6 +28,7 @@ This document describes the SQLite database schema for agent-session-analytics. | `session_commits` | Junction table linking sessions to commits | ~3K | | `bus_events` | Cross-session events from event-bus | ~2K | | `events_fts` | FTS5 virtual table for user message search | N/A | +| `raw_entries` | Unparsed JSONL entries for future re-parsing | 100K+ | --- @@ -192,6 +193,24 @@ CREATE TABLE patterns ( ) ``` +### raw_entries + +Unparsed JSONL entries for future re-parsing. Stored separately from `events` to preserve original source material. + +```sql +CREATE TABLE raw_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + project_path TEXT, + timestamp TEXT NOT NULL, + entry_json TEXT NOT NULL, -- Full original JSONL entry + ingested_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(session_id, timestamp, entry_json) +) +``` + +**Design note**: The UNIQUE constraint on `entry_json` ensures exact deduplication. While this means large JSON values are compared, SQLite handles this efficiently and it avoids hash collision edge cases. + --- ## Indexes @@ -224,6 +243,8 @@ Performance-critical indexes on the `events` table: | `bus_events` | `idx_bus_events_type` | `event_type` | | `bus_events` | `idx_bus_events_session` | `session_id` | | `bus_events` | `idx_bus_events_repo` | `repo` | +| `raw_entries` | `idx_raw_entries_session` | `session_id` | +| `raw_entries` | `idx_raw_entries_timestamp` | `timestamp` | --- @@ -262,6 +283,7 @@ Sync triggers maintain index consistency: | 10 | backfill_compaction_and_result_size | Backfill compaction detection and result_size_bytes for existing data | | 11 | fix_compaction_detection_user_entries | Fix compaction detection to look at user entries (not just summary) | | 12 | fix_warmup_not_errors | Fix warmup events incorrectly marked as errors (Issue #75) | +| 13 | add_raw_entries_table | Raw JSONL storage for future re-parsing (Issue #93) | --- diff --git a/src/agent_session_analytics/storage.py b/src/agent_session_analytics/storage.py index 5623e03..55bd5cf 100644 --- a/src/agent_session_analytics/storage.py +++ b/src/agent_session_analytics/storage.py @@ -177,7 +177,7 @@ class BusEvent: OLD_DB_PATH = Path.home() / ".claude" / "contrib" / "analytics" / "data.db" # Schema version for migrations -SCHEMA_VERSION = 12 +SCHEMA_VERSION = 13 # Migration functions: dict of version -> (migration_name, migration_func) # Each migration upgrades FROM version-1 TO version diff --git a/tests/test_storage.py b/tests/test_storage.py index ac59b53..8716810 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1121,3 +1121,41 @@ def test_result_size_bytes_column_exists(self, storage): rows = storage.execute_query("PRAGMA table_info(events)") columns = {row[1] for row in rows} assert "result_size_bytes" in columns + + +class TestRawEntries: + """Tests for raw_entries storage (Issue #93).""" + + def test_add_raw_entries_batch(self, storage): + """Test adding raw JSONL entries.""" + entries = [ + ("session-1", "test-project", "2025-01-01T12:00:00Z", '{"type":"user"}'), + ("session-1", "test-project", "2025-01-01T12:01:00Z", '{"type":"assistant"}'), + ("session-2", "test-project", "2025-01-01T12:00:00Z", '{"type":"user"}'), + ] + count = storage.add_raw_entries_batch(entries) + assert count == 3 + assert storage.get_raw_entry_count() == 3 + + def test_add_raw_entries_batch_empty(self, storage): + """Test batch add with empty list.""" + count = storage.add_raw_entries_batch([]) + assert count == 0 + assert storage.get_raw_entry_count() == 0 + + def test_add_raw_entries_dedup(self, storage): + """Test that duplicate raw entries are ignored.""" + entry = ("session-1", "test-project", "2025-01-01T12:00:00Z", '{"type":"user"}') + storage.add_raw_entries_batch([entry]) + storage.add_raw_entries_batch([entry]) # Same entry again + assert storage.get_raw_entry_count() == 1 + + def test_raw_entries_table_exists(self, storage): + """Verify that raw_entries table exists with correct schema.""" + rows = storage.execute_query("PRAGMA table_info(raw_entries)") + columns = {row[1] for row in rows} + assert "session_id" in columns + assert "project_path" in columns + assert "timestamp" in columns + assert "entry_json" in columns + assert "ingested_at" in columns From 5b421cebda15d642f96ce4ea3438e6dc3406fb91 Mon Sep 17 00:00:00 2001 From: Evan Senter Date: Mon, 26 Jan 2026 23:38:57 -0800 Subject: [PATCH 3/3] test: Add raw_entries_added assertion to test_upload_entries Use unique identifiers to avoid dedup across test runs. Addresses claude-review suggestion on PR #98. Co-Authored-By: Claude Opus 4.5 --- tests/test_server.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_server.py b/tests/test_server.py index cbd11dd..cd391f4 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -426,13 +426,16 @@ def test_get_sync_status_with_filter(): def test_upload_entries(): """Test that upload_entries accepts and parses raw JSONL entries.""" - # Create a minimal valid entry + import uuid as uuid_mod + + # Use unique identifiers to avoid dedup across test runs + unique_id = uuid_mod.uuid4().hex[:8] test_entries = [ { "type": "user", - "sessionId": "test-upload-session", + "sessionId": f"test-upload-session-{unique_id}", "timestamp": "2026-01-25T10:00:00Z", - "uuid": "test-upload-uuid-001", + "uuid": f"test-upload-uuid-{unique_id}", "message": {"content": "test message"}, } ] @@ -443,6 +446,8 @@ def test_upload_entries(): assert "events_parsed" in result assert "events_added" in result assert "sessions_updated" in result + assert "raw_entries_added" in result + assert result["raw_entries_added"] == 1 def test_upload_entries_empty():