Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions src/agent_session_analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,7 @@ def cmd_benchmark(args):
# - ingest_logs, ingest_git_history, ingest_git_history_all_projects
# - correlate_git_with_sessions, ingest_bus_events
# - find_related_sessions (requires valid session_id)
# - upload_entries, get_sync_status (remote sync tools - modify DB or require client context)

benchmarks = []
for tool_name, tool_func in tool_functions.items():
Expand All @@ -1498,6 +1499,186 @@ def cmd_benchmark(args):
print(format_output(output, args.json))


def cmd_push(args):
"""Push local session data to a remote server with incremental sync."""
import os
import urllib.request
from datetime import datetime

from agent_session_analytics.ingest import find_log_files

# Get remote URL from env var or args
remote_url = getattr(args, "url", None) or os.environ.get("AGENT_SESSION_ANALYTICS_URL")
if not remote_url:
print("Error: No remote URL specified.")
print("Set AGENT_SESSION_ANALYTICS_URL or use --url")
return

# Normalize URL to point to MCP endpoint
if not remote_url.endswith("/mcp"):
remote_url = remote_url.rstrip("/") + "/mcp"

def mcp_call(method_name: str, arguments: dict) -> dict | None:
"""Make an MCP call and return parsed result."""
mcp_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": method_name, "arguments": arguments},
"id": 1,
}
try:
req = urllib.request.Request(
remote_url,
data=json.dumps(mcp_request).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode("utf-8"))
if "result" in result:
content = result["result"].get("content", [])
if content and content[0].get("type") == "text":
return json.loads(content[0]["text"])
elif "error" in result:
print(f"Error: {result['error']}")
except urllib.error.URLError as e:
print(f"Error connecting to {remote_url}: {e}")
except Exception as e:
print(f"Error in MCP call: {e}")
return None

# Find local JSONL files
files = find_log_files(days=args.days, project_filter=args.project)
if not files:
print(f"No log files found in the last {args.days} days")
return

if not args.json:
print(f"Found {len(files)} log files")

# Read all entries and group by session_id
all_entries_by_session: dict[
str, list[tuple[str, dict]]
] = {} # session_id -> [(project_path, entry), ...]
local_parse_errors = 0
for file_path in files:
project_path = file_path.parent.name
try:
with open(file_path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
session_id = entry.get("sessionId")
if session_id:
if session_id not in all_entries_by_session:
all_entries_by_session[session_id] = []
all_entries_by_session[session_id].append((project_path, entry))
except json.JSONDecodeError:
local_parse_errors += 1
except Exception as e:
if not args.json:
print(f"Warning: Failed to read {file_path}: {e}")

if not all_entries_by_session:
print("No entries found in log files")
return

total_local_entries = sum(len(v) for v in all_entries_by_session.values())
if not args.json:
print(f"Found {total_local_entries} entries across {len(all_entries_by_session)} sessions")

# Get sync status from server (what it already has)
if not args.json:
print("Checking server sync status...")

sync_status = mcp_call("get_sync_status", {"session_ids": list(all_entries_by_session.keys())})
if sync_status is None:
return

server_sessions = sync_status.get("sessions", {})

# Filter to only entries newer than server's latest per session
entries_to_send: list[tuple[str, dict]] = [] # [(project_path, entry), ...]
for session_id, entries in all_entries_by_session.items():
server_latest = server_sessions.get(session_id)
if server_latest:
# Parse server timestamp and filter
server_ts = datetime.fromisoformat(server_latest.replace("Z", "+00:00"))
for project_path, entry in entries:
entry_ts_str = entry.get("timestamp")
if entry_ts_str:
try:
entry_ts = datetime.fromisoformat(entry_ts_str.replace("Z", "+00:00"))
if entry_ts > server_ts:
entries_to_send.append((project_path, entry))
except ValueError:
entries_to_send.append((project_path, entry)) # Can't parse, send anyway
else:
# Server doesn't have this session, send all entries
entries_to_send.extend(entries)

if not entries_to_send:
output = {
"status": "ok",
"message": "Already in sync",
"files_checked": len(files),
"entries_checked": total_local_entries,
"entries_sent": 0,
"events_added": 0,
"remote_url": remote_url,
}
print(format_output(output, args.json))
return

if not args.json:
print(
f"Sending {len(entries_to_send)} new entries (skipping {total_local_entries - len(entries_to_send)} already synced)"
)

# Group entries to send by project_path for batching
by_project: dict[str, list[dict]] = {}
for project_path, entry in entries_to_send:
if project_path not in by_project:
by_project[project_path] = []
by_project[project_path].append(entry)

total_added = 0
total_skipped = 0
total_errors = 0

# Upload in batches per project
batch_size = args.batch_size
for project_path, entries in by_project.items():
for i in range(0, len(entries), batch_size):
batch = entries[i : i + batch_size]
result = mcp_call("upload_entries", {"entries": batch, "project_path": project_path})
if result is None:
return
total_added += result.get("events_added", 0)
total_skipped += result.get("events_skipped", 0)
total_errors += result.get("parse_errors", 0)

if not args.json:
print(f" {project_path}: {len(entries)} entries")

output = {
"status": "ok",
"files_checked": len(files),
"entries_checked": total_local_entries,
"entries_sent": len(entries_to_send),
"events_added": total_added,
"events_skipped": total_skipped,
"local_parse_errors": local_parse_errors,
"remote_parse_errors": total_errors,
"remote_url": remote_url,
}

print(format_output(output, args.json))


def main():
"""CLI entry point."""
epilog = """
Expand Down Expand Up @@ -1834,6 +2015,19 @@ def main():
)
sub.set_defaults(func=cmd_benchmark)

# push (Issue #93 - remote ingestion)
sub = subparsers.add_parser("push", help="Push local session data to remote server")
sub.add_argument("--days", type=int, default=365, help="Days to look back (default: 365)")
sub.add_argument("--project", help="Project path filter")
sub.add_argument("--url", help="Remote server URL (or set AGENT_SESSION_ANALYTICS_URL)")
sub.add_argument(
"--batch-size",
type=int,
default=500,
help="Entries per batch (default: 500)",
)
sub.set_defaults(func=cmd_push)

args = parser.parse_args()
args.func(args)

Expand Down
20 changes: 20 additions & 0 deletions src/agent_session_analytics/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ identify permission gaps.
| `get_status()` | Database stats, last ingestion time |
| `ingest_logs(days?, project?, force?)` | Refresh data from JSONL files |

### Remote Sync (Multi-Machine)

For setups where the database lives on a central server (e.g., via Tailscale):

| Tool | Purpose |
|------|---------|
| `get_sync_status(session_ids?)` | Get latest timestamp per session for incremental sync |
| `upload_entries(entries, project_path)` | Upload raw JSONL entries from remote clients |

**CLI usage:**
```bash
# Set remote server URL
export AGENT_SESSION_ANALYTICS_URL=https://server.tailnet.ts.net/mcp

# Push local session data (incremental - only sends new entries)
agent-session-analytics-cli push --days 365
```

The `push` command queries `get_sync_status()` first to determine what the server already has, then only uploads entries newer than the server's latest per session.

### Core Queries

| Tool | Purpose |
Expand Down
68 changes: 68 additions & 0 deletions src/agent_session_analytics/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,74 @@ def ingest_logs(days: int = 7, project: str | None = None, force: bool = False)
}


@mcp.tool()
def get_sync_status(session_ids: list[str] | None = None) -> dict:
"""Get latest event timestamp per session for incremental sync.

Args:
session_ids: Optional list of session IDs to check (all if not specified)
"""
query = """
SELECT session_id, MAX(timestamp) as latest_timestamp
FROM events
"""
params = []

if session_ids:
placeholders = ",".join("?" * len(session_ids))
query += f" WHERE session_id IN ({placeholders})"
params = session_ids

query += " GROUP BY session_id"

rows = storage.execute_query(query, params)

return {
"status": "ok",
"sessions": {row["session_id"]: row["latest_timestamp"] for row in rows},
}


@mcp.tool()
def upload_entries(entries: list[dict], project_path: str) -> dict:
"""Upload raw JSONL entries from a remote client.

For multi-machine setups where session JSONL files live on client machines.
Entries are parsed server-side so future parser improvements apply.

Args:
entries: List of raw JSONL entry dicts (as read from session files)
project_path: Project path identifier (typically the directory name)
"""
# Parse entries server-side using the same logic as local ingestion
all_events = []
errors = 0

for raw in entries:
try:
parsed = ingest.parse_entry(raw, project_path)
all_events.extend(parsed)
except Exception as e:
logger.debug(f"Error parsing uploaded entry: {e}")
errors += 1

# Insert with deduplication (INSERT OR IGNORE on uuid)
events_added = storage.add_events_batch(all_events) if all_events else 0

# Update session statistics
sessions_updated = ingest.update_session_stats(storage)

return {
"status": "ok",
"entries_received": len(entries),
"events_parsed": len(all_events),
"events_added": events_added,
"events_skipped": len(all_events) - events_added,
"sessions_updated": sessions_updated,
"parse_errors": errors,
}


@mcp.tool()
def get_tool_frequency(days: int = 7, project: str | None = None, expand: bool = True) -> dict:
"""Get tool usage frequency counts.
Expand Down
51 changes: 51 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_session_messages,
get_session_signals,
get_status,
get_sync_status,
get_token_usage,
get_tool_frequency,
get_tool_sequences,
Expand All @@ -32,6 +33,7 @@
list_sessions,
sample_sequences,
search_messages,
upload_entries,
)


Expand Down Expand Up @@ -401,6 +403,55 @@ def test_get_large_tool_results():
assert isinstance(result["large_results"], list)


# --- Remote Ingestion Tests (Issue #93) ---


def test_get_sync_status():
"""Test that get_sync_status returns latest timestamps per session."""
result = get_sync_status.fn(session_ids=None)
assert result["status"] == "ok"
assert "sessions" in result
assert isinstance(result["sessions"], dict)


def test_get_sync_status_with_filter():
"""Test that get_sync_status filters by session_ids."""
result = get_sync_status.fn(session_ids=["nonexistent-session"])
assert result["status"] == "ok"
assert "sessions" in result
# Nonexistent session should return empty
assert "nonexistent-session" not in result["sessions"]


def test_upload_entries():
"""Test that upload_entries accepts and parses raw JSONL entries."""
# Create a minimal valid entry
test_entries = [
{
"type": "user",
"sessionId": "test-upload-session",
"timestamp": "2026-01-25T10:00:00Z",
"uuid": "test-upload-uuid-001",
"message": {"content": "test message"},
}
]
result = upload_entries.fn(entries=test_entries, project_path="test-project")
assert result["status"] == "ok"
assert "entries_received" in result
assert result["entries_received"] == 1
assert "events_parsed" in result
assert "events_added" in result
assert "sessions_updated" in result


def test_upload_entries_empty():
"""Test that upload_entries handles empty list."""
result = upload_entries.fn(entries=[], project_path="test-project")
assert result["status"] == "ok"
assert result["entries_received"] == 0
assert result["events_parsed"] == 0


# --- Tailscale Auth Middleware Tests ---


Expand Down