Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bookstack_agent/ui/app/analytics/components/query-metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ export default function QueryMetrics({ metrics }: QueryMetricsProps) {
accent="from-vector-violet to-vector-cobalt"
/>
<MetricCard
label="Unique Sessions"
value={metrics.unique_sessions.toLocaleString()}
sub="distinct conversations"
label="Unique Users"
value={metrics.unique_users.toLocaleString()}
sub="distinct users"
icon={<Users className="w-4 h-4 text-white" />}
accent="from-vector-cobalt to-vector-violet"
/>
Expand Down
2 changes: 2 additions & 0 deletions bookstack_agent/ui/lib/bookstack-data-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export function computeBookstackMetrics(log: BookstackActivityLog): BookstackMet
const errors = activities.filter(a => a.status === 'error').length

const sessions = new Set(activities.map(a => a.session_id))
const users = new Set(activities.map(a => a.user_email).filter(Boolean))

const durations = activities.map(a => a.duration_seconds).filter(d => d > 0)
const avgDuration = durations.length > 0
Expand Down Expand Up @@ -103,6 +104,7 @@ export function computeBookstackMetrics(log: BookstackActivityLog): BookstackMet
error_queries: errors,
success_rate: total > 0 ? successful / total : 0,
unique_sessions: sessions.size,
unique_users: users.size,
avg_duration_seconds: avgDuration,
avg_tool_calls_per_query: avgToolCalls,
total_tool_calls: totalToolCalls,
Expand Down
1 change: 1 addition & 0 deletions bookstack_agent/ui/lib/bookstack-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface BookstackMetrics {
error_queries: number
success_rate: number
unique_sessions: number
unique_users: number
avg_duration_seconds: number
avg_tool_calls_per_query: number
total_tool_calls: number
Expand Down
93 changes: 71 additions & 22 deletions src/aieng_bot/bookstack/activity_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@
from datetime import datetime, timezone
from typing import Any


class _NeverRaisedError(Exception):
"""Sentinel used when google-cloud-storage is not installed."""


try:
from google.api_core.exceptions import PreconditionFailed as _PreconditionFailed
from google.cloud import storage as _gcs_storage

_GCS_AVAILABLE = True
except ImportError:
_PreconditionFailed = _NeverRaisedError
_gcs_storage = None
_GCS_AVAILABLE = False

logger = logging.getLogger(__name__)

_MAX_CAS_RETRIES = 5

BUCKET = "bot-dashboard-vectorinstitute"
ACTIVITY_LOG_PATH = "data/bookstack_activity_log.json"
TRACES_PREFIX = "data/bookstack/traces"
Expand Down Expand Up @@ -66,25 +75,27 @@ def _get_client(self) -> Any:
self._client = _gcs_storage.Client()
return self._client

def _load_activity_log(self) -> dict[str, Any] | None:
def _load_activity_log(self) -> tuple[dict[str, Any], int] | None:
"""Download the current activity log from GCS.

Returns
-------
dict
Parsed log with ``activities`` list and ``last_updated`` key.
Returns an empty structure if the log does not yet exist.
Returns ``None`` on any read error (caller must abort write).
tuple[dict, int]
``(log_data, generation)`` where ``generation=0`` means the object
does not yet exist (use ``if_generation_match=0`` to create it
atomically). Returns ``None`` on any read error (caller must
abort the write to protect existing data).

"""
try:
client = self._get_client()
bucket = client.bucket(self.bucket)
blob = bucket.blob(self.log_path)
if not blob.exists():
return {"activities": [], "last_updated": None}
data = blob.download_as_text()
return json.loads(data)
return {"activities": [], "last_updated": None}, 0
raw = blob.download_as_text()
generation: int = blob.generation or 0
return json.loads(raw), generation
except json.JSONDecodeError as exc:
logger.error("Failed to parse bookstack activity log: %s", exc)
return None
Expand All @@ -96,18 +107,30 @@ def _load_activity_log(self) -> dict[str, Any] | None:
)
return None

def _save_activity_log(self, log_data: dict[str, Any]) -> bool:
"""Upload the activity log to GCS.
def _save_activity_log(
self, log_data: dict[str, Any], if_generation_match: int
) -> bool:
"""Upload the activity log to GCS with optimistic concurrency control.

Parameters
----------
log_data : dict
Updated activity log to persist.
if_generation_match : int
GCS generation precondition. Pass ``0`` to require the object
not to exist yet; pass the value from ``_load_activity_log`` to
require it has not changed since the read.

Returns
-------
bool
``True`` on success, ``False`` on failure.
``True`` on success, ``False`` on a non-retryable error.

Raises
------
_PreconditionFailed
When another writer modified the log between the read and this
write (HTTP 412). The caller should reload and retry.

"""
try:
Expand All @@ -117,8 +140,11 @@ def _save_activity_log(self, log_data: dict[str, Any]) -> bool:
blob.upload_from_string(
json.dumps(log_data, indent=2),
content_type="application/json",
if_generation_match=if_generation_match,
)
return True
except _PreconditionFailed:
raise
except Exception as exc:
logger.error("Failed to upload bookstack activity log to GCS: %s", exc)
return False
Expand Down Expand Up @@ -257,19 +283,42 @@ def log_query(
"trace_path": trace_path,
}

log_data = self._load_activity_log()
if log_data is None:
logger.error(
"Aborting bookstack activity log write for session %s "
"to prevent overwriting existing data after a GCS read failure",
session_id[:8],
)
return False
log_saved = False
for attempt in range(_MAX_CAS_RETRIES):
snapshot = self._load_activity_log()
if snapshot is None:
logger.error(
"Aborting bookstack activity log write for session %s "
"to prevent overwriting existing data after a GCS read failure",
session_id[:8],
)
break

log_data["activities"].append(activity)
log_data["last_updated"] = timestamp
log_data, generation = snapshot
log_data["activities"].append(activity)
log_data["last_updated"] = timestamp

log_saved = self._save_activity_log(log_data)
try:
log_saved = self._save_activity_log(
log_data, if_generation_match=generation
)
break
except _PreconditionFailed:
if attempt < _MAX_CAS_RETRIES - 1:
logger.debug(
"CAS conflict writing bookstack activity log, "
"retrying (%d/%d, session=%s)",
attempt + 1,
_MAX_CAS_RETRIES,
session_id[:8],
)
else:
logger.error(
"Gave up writing bookstack activity log after %d CAS retries "
"(session=%s)",
_MAX_CAS_RETRIES,
session_id[:8],
)

if log_saved:
logger.info(
Expand Down
107 changes: 97 additions & 10 deletions tests/bookstack/test_activity_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import pytest

from aieng_bot.bookstack.activity_logger import BookstackActivityLogger
from aieng_bot.bookstack.activity_logger import (
BookstackActivityLogger,
_PreconditionFailed,
)

# ---------------------------------------------------------------------------
# Fixtures
Expand Down Expand Up @@ -101,32 +104,39 @@ def test_load_existing_log(
mock_gcs_client: MagicMock,
sample_log: dict[str, Any],
) -> None:
"""Return parsed log when blob exists."""
"""Return (log_data, generation) when blob exists."""
blob = MagicMock()
blob.exists.return_value = True
blob.download_as_text.return_value = json.dumps(sample_log)
blob.generation = 42
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

result = logger._load_activity_log()

assert result == sample_log
assert len(result["activities"]) == 1 # type: ignore[index]
assert result is not None
log_data, generation = result
assert log_data == sample_log
assert len(log_data["activities"]) == 1
assert generation == 42

def test_load_returns_empty_when_blob_not_found(
self,
logger: BookstackActivityLogger,
mock_gcs_client: MagicMock,
) -> None:
"""Return empty structure when the log file does not yet exist."""
"""Return (empty structure, generation=0) when the log file does not yet exist."""
blob = MagicMock()
blob.exists.return_value = False
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

result = logger._load_activity_log()

assert result == {"activities": [], "last_updated": None}
assert result is not None
log_data, generation = result
assert log_data == {"activities": [], "last_updated": None}
assert generation == 0

def test_load_returns_none_on_gcs_error(
self,
Expand Down Expand Up @@ -172,17 +182,18 @@ def test_save_success(
mock_gcs_client: MagicMock,
sample_log: dict[str, Any],
) -> None:
"""Return True and upload JSON with correct content-type on success."""
"""Return True, upload JSON with correct content-type and generation precondition."""
blob = MagicMock()
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

result = logger._save_activity_log(sample_log)
result = logger._save_activity_log(sample_log, if_generation_match=99)

assert result is True
blob.upload_from_string.assert_called_once()
call_kwargs = blob.upload_from_string.call_args
assert call_kwargs.kwargs["content_type"] == "application/json"
assert call_kwargs.kwargs["if_generation_match"] == 99
saved = json.loads(call_kwargs.args[0])
assert saved == sample_log

Expand All @@ -192,16 +203,31 @@ def test_save_returns_false_on_gcs_error(
mock_gcs_client: MagicMock,
sample_log: dict[str, Any],
) -> None:
"""Return False when the GCS upload raises."""
"""Return False when the GCS upload raises a non-CAS error."""
blob = MagicMock()
blob.upload_from_string.side_effect = Exception("upload failed")
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

result = logger._save_activity_log(sample_log)
result = logger._save_activity_log(sample_log, if_generation_match=0)

assert result is False

def test_save_raises_on_precondition_failed(
self,
logger: BookstackActivityLogger,
mock_gcs_client: MagicMock,
sample_log: dict[str, Any],
) -> None:
"""Re-raise PreconditionFailed so the caller can retry."""
blob = MagicMock()
blob.upload_from_string.side_effect = _PreconditionFailed("conflict")
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

with pytest.raises(_PreconditionFailed):
logger._save_activity_log(sample_log, if_generation_match=5)


# ---------------------------------------------------------------------------
# _save_trace
Expand Down Expand Up @@ -359,6 +385,7 @@ def test_log_query_appends_to_existing_log(
blob = MagicMock()
blob.exists.return_value = True
blob.download_as_text.return_value = json.dumps(sample_log)
blob.generation = 77
mock_gcs_client.bucket.return_value.blob.return_value = blob
_attach_mock_client(logger, mock_gcs_client)

Expand All @@ -378,6 +405,7 @@ def test_log_query_appends_to_existing_log(
assert len(saved["activities"]) == 2
assert saved["activities"][0]["session_id"] == "existing01"
assert saved["activities"][1]["session_id"] == "newentry"
assert spy.call_args.kwargs["if_generation_match"] == 77

def test_log_query_returns_false_when_load_fails(
self,
Expand Down Expand Up @@ -453,3 +481,62 @@ def test_log_query_deduplicates_tools_used(
# tools_used should list each tool only once
assert saved["activities"][0]["tools_used"] == ["search_bookstack"]
assert saved["activities"][0]["num_tool_calls"] == 3

def test_log_query_retries_on_cas_conflict(
self,
logger: BookstackActivityLogger,
) -> None:
"""Retry the append+save loop on a 412 PreconditionFailed and succeed on the second attempt."""
save_attempts: list[int] = []

def mock_save(_log_data: dict[str, Any], *, if_generation_match: int) -> bool:
save_attempts.append(if_generation_match)
if len(save_attempts) == 1:
raise _PreconditionFailed("concurrent write")
return True

empty_snapshot = ({"activities": [], "last_updated": None}, 0)

with (
patch.object(logger, "_save_trace", return_value=True),
patch.object(logger, "_load_activity_log", return_value=empty_snapshot),
patch.object(logger, "_save_activity_log", side_effect=mock_save),
):
result = logger.log_query(
session_id="sess_cas",
question="Will this retry?",
tool_calls=[],
answer="Yes",
duration_seconds=1.0,
status="success",
)

assert result is True
assert len(save_attempts) == 2 # failed once, succeeded on retry

def test_log_query_gives_up_after_max_cas_retries(
self,
logger: BookstackActivityLogger,
) -> None:
"""Return False when all CAS retry attempts are exhausted."""
empty_snapshot = ({"activities": [], "last_updated": None}, 0)

with (
patch.object(logger, "_save_trace", return_value=True),
patch.object(logger, "_load_activity_log", return_value=empty_snapshot),
patch.object(
logger,
"_save_activity_log",
side_effect=_PreconditionFailed("persistent conflict"),
),
):
result = logger.log_query(
session_id="sess_exhaust",
question="Will this exhaust retries?",
tool_calls=[],
answer="No",
duration_seconds=1.0,
status="success",
)

assert result is False