Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ htmlcov/
*.db
.claude/
.parallel-context.md
.worktrees/
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ make dev # Run in dev mode with auto-reload
| `query_sequences` | Common tool patterns (n-grams) |
| `query_permission_gaps` | Commands needing settings.json entries |
| `get_insights` | Pre-computed patterns for /improve-workflow |
| `get_user_journey` | User messages across sessions chronologically |
| `search_messages` | Full-text search on user messages (FTS5) |

## CLI Commands

Expand All @@ -83,6 +85,8 @@ session-analytics-cli tokens --by model # Token usage
session-analytics-cli sequences # Tool chains
session-analytics-cli permissions # Permission gaps
session-analytics-cli insights # For /improve-workflow
session-analytics-cli journey # User messages across sessions
session-analytics-cli search <query> # Full-text search on messages
```

## Integration
Expand Down
66 changes: 65 additions & 1 deletion src/session_analytics/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import json
import sqlite3

from session_analytics.ingest import (
correlate_git_with_sessions as do_correlate_git,
Expand Down Expand Up @@ -160,7 +161,9 @@ def _format_user_journey(data: dict) -> list[str]:

for event in data.get("journey", [])[:20]:
ts = event.get("timestamp", "")[:16] if event.get("timestamp") else "unknown"
msg = event.get("message", "")[:60]
msg = event.get("message", "") if event.get("message") else ""
if len(msg) > 60:
msg = msg[:57] + "..."
project = event.get("project", "")
if project:
lines.append(f" [{ts}] ({project}) {msg}")
Expand All @@ -171,6 +174,28 @@ def _format_user_journey(data: dict) -> list[str]:
return lines


@_register_formatter(lambda d: "query" in d and "messages" in d and "count" in d)
def _format_search_results(data: dict) -> list[str]:
lines = [
f"Search: {data['query']}",
f"Results: {data['count']}",
"",
]
for msg in data.get("messages", [])[:20]:
ts = msg.get("timestamp", "")[:16] if msg.get("timestamp") else "unknown"
text = msg.get("message", "") if msg.get("message") else ""
if len(text) > 60:
text = text[:57] + "..."
project = msg.get("project", "")
if project:
lines.append(f" [{ts}] ({project}) {text}")
else:
lines.append(f" [{ts}] {text}")
if len(data.get("messages", [])) > 20:
lines.append(f" ... and {len(data['messages']) - 20} more")
return lines


@_register_formatter(lambda d: "parallel_periods" in d and "parallel_period_count" in d)
def _format_parallel_sessions(data: dict) -> list[str]:
lines = [
Expand Down Expand Up @@ -473,6 +498,38 @@ def cmd_journey(args):
print(format_output(result, args.json))


def cmd_search(args):
"""Search user messages using full-text search."""
storage = SQLiteStorage()
project = getattr(args, "project", None)
try:
results = storage.search_user_messages(args.query, limit=args.limit, project=project)
except sqlite3.OperationalError as e:
# Catch FTS5-related errors (syntax, unterminated strings, etc.)
output = {
"status": "error",
"query": args.query,
"error": f"Invalid FTS5 query syntax: {e}",
}
print(format_output(output, args.json))
return
output = {
"query": args.query,
"project": project,
"count": len(results),
"messages": [
{
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
"session_id": e.session_id,
"project": e.project_path,
"message": e.user_message_text,
}
for e in results
],
}
print(format_output(output, args.json))


def cmd_parallel(args):
"""Show parallel session detection."""
storage = SQLiteStorage()
Expand Down Expand Up @@ -664,6 +721,13 @@ def main():
sub.add_argument("--no-projects", action="store_true", help="Exclude project info")
sub.set_defaults(func=cmd_journey)

# search
sub = subparsers.add_parser("search", help="Search user messages (FTS)")
sub.add_argument("query", help="FTS5 query (e.g., 'auth', '\"fix bug\"', 'skip OR defer')")
sub.add_argument("--limit", type=int, default=50, help="Max results (default: 50)")
sub.add_argument("--project", help="Project path filter")
sub.set_defaults(func=cmd_search)

# parallel
sub = subparsers.add_parser("parallel", help="Detect parallel sessions")
sub.add_argument("--hours", type=int, default=24, help="Hours to look back (default: 24)")
Expand Down
51 changes: 51 additions & 0 deletions src/session_analytics/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
- query_tokens: Token usage analysis
- get_insights: Pre-computed patterns for /improve-workflow
- get_status: Ingestion status + DB stats
- get_user_journey: User messages across sessions
- search_messages: Full-text search on user messages
"""

import logging
import os
import sqlite3
from importlib.metadata import version
from pathlib import Path

Expand Down Expand Up @@ -289,6 +292,54 @@ def get_user_journey(hours: int = 24, include_projects: bool = True, limit: int
return {"status": "ok", **result}


@mcp.tool()
def search_messages(query: str, limit: int = 50, project: str | None = None) -> dict:
"""Search user messages using full-text search.

Uses FTS5 to efficiently search across all user messages. Useful for finding
discussions about specific topics, decisions, or patterns across sessions.

Note: Searches user messages only, not assistant responses.

Args:
query: FTS5 query string. Supports:
- Simple terms: "authentication"
- Phrases: '"fix the bug"'
- Boolean: "auth AND error", "skip OR defer"
- Prefix: "implement*"
limit: Maximum results to return (default: 50)
project: Optional project path filter

Returns:
Matching messages with session context and timestamps
"""
queries.ensure_fresh_data(storage)
try:
results = storage.search_user_messages(query, limit=limit, project=project)
except sqlite3.OperationalError as e:
# Catch FTS5-related errors (syntax, unterminated strings, etc.)
return {
"status": "error",
"query": query,
"error": f"Invalid FTS5 query syntax: {e}",
}
return {
"status": "ok",
"query": query,
"project": project,
"count": len(results),
"messages": [
{
"timestamp": e.timestamp.isoformat() if e.timestamp else None,
"session_id": e.session_id,
"project": e.project_path,
"message": e.user_message_text,
}
for e in results
],
}


@mcp.tool()
def detect_parallel_sessions(hours: int = 24, min_overlap_minutes: int = 5) -> dict:
"""Find sessions that were active simultaneously.
Expand Down
38 changes: 27 additions & 11 deletions src/session_analytics/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,28 +929,44 @@ def get_git_commit_count(self) -> int:

# Full-text search operations

def search_user_messages(self, query: str, limit: int = 100) -> list[Event]:
def search_user_messages(
self, query: str, limit: int = 100, project: str | None = None
) -> list[Event]:
"""Search user messages using full-text search.

Args:
query: FTS5 query string (supports AND, OR, NOT, phrases, etc.)
limit: Maximum number of results
project: Optional project path filter (LIKE %project%)

Returns:
List of Event objects matching the search query
"""
with self._connect() as conn:
# Use FTS5 MATCH to search, join back to events for full data
rows = conn.execute(
"""
SELECT events.* FROM events
INNER JOIN events_fts ON events.id = events_fts.rowid
WHERE events_fts MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()
if project:
rows = conn.execute(
"""
SELECT events.* FROM events
INNER JOIN events_fts ON events.id = events_fts.rowid
WHERE events_fts MATCH ?
AND events.project_path LIKE ?
ORDER BY rank
LIMIT ?
""",
(query, f"%{project}%", limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT events.* FROM events
INNER JOIN events_fts ON events.id = events_fts.rowid
WHERE events_fts MATCH ?
ORDER BY rank
LIMIT ?
""",
(query, limit),
).fetchall()

return [self._row_to_event(row) for row in rows]

Expand Down
99 changes: 99 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
cmd_frequency,
cmd_insights,
cmd_permissions,
cmd_search,
cmd_sequences,
cmd_sessions,
cmd_status,
Expand Down Expand Up @@ -58,6 +59,24 @@ def populated_storage(storage):
input_tokens=80,
output_tokens=30,
),
Event(
id=None,
uuid="u1",
timestamp=now - timedelta(hours=1, minutes=30),
session_id="s1",
project_path="-test",
entry_type="user",
user_message_text="Fix the authentication bug in the login flow",
),
Event(
id=None,
uuid="u2",
timestamp=now - timedelta(hours=2, minutes=30),
session_id="s1",
project_path="-test",
entry_type="user",
user_message_text="Add unit tests for the API endpoints",
),
]
storage.add_events_batch(events)

Expand Down Expand Up @@ -287,3 +306,83 @@ class Args:

captured = capsys.readouterr()
assert '"total_tool_calls"' in captured.out

def test_cmd_search(self, populated_storage, capsys):
"""Test search command."""

class Args:
json = False
query = "authentication"
limit = 50
project = None

with patch("session_analytics.cli.SQLiteStorage", return_value=populated_storage):
cmd_search(Args())

captured = capsys.readouterr()
assert "Search: authentication" in captured.out
assert "Results:" in captured.out

def test_cmd_search_no_results(self, populated_storage, capsys):
"""Test search command with no results."""

class Args:
json = False
query = "nonexistent_query_xyz"
limit = 50
project = None

with patch("session_analytics.cli.SQLiteStorage", return_value=populated_storage):
cmd_search(Args())

captured = capsys.readouterr()
assert "Results: 0" in captured.out

def test_cmd_search_json_output(self, populated_storage, capsys):
"""Test search command with JSON output."""

class Args:
json = True
query = "authentication"
limit = 50
project = None

with patch("session_analytics.cli.SQLiteStorage", return_value=populated_storage):
cmd_search(Args())

captured = capsys.readouterr()
assert '"query": "authentication"' in captured.out
assert '"count":' in captured.out
assert '"messages":' in captured.out

def test_cmd_search_malformed_query(self, populated_storage, capsys):
"""Test search command with malformed FTS5 query."""

class Args:
json = False
query = '"unclosed quote'
limit = 50
project = None

with patch("session_analytics.cli.SQLiteStorage", return_value=populated_storage):
cmd_search(Args())

captured = capsys.readouterr()
# Should show error instead of crashing
assert "error" in captured.out.lower() or "Error" in captured.out

def test_cmd_search_with_project_filter(self, populated_storage, capsys):
"""Test search command with project filter."""

class Args:
json = False
query = "authentication"
limit = 50
project = "-test"

with patch("session_analytics.cli.SQLiteStorage", return_value=populated_storage):
cmd_search(Args())

captured = capsys.readouterr()
assert "Search: authentication" in captured.out
assert "Results:" in captured.out
12 changes: 12 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
query_timeline,
query_tokens,
query_tool_frequency,
search_messages,
)


Expand Down Expand Up @@ -109,3 +110,14 @@ def test_get_insights():
assert "sequences" in result
assert "permission_gaps" in result
assert "summary" in result


def test_search_messages():
"""Test that search_messages returns FTS results."""
result = search_messages.fn(query="test", limit=10)
assert result["status"] == "ok"
assert "query" in result
assert result["query"] == "test"
assert "count" in result
assert "messages" in result
assert isinstance(result["messages"], list)