From b339fa2a9a2b21b714494cf31d883476218855c1 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 16 Jun 2026 19:22:29 -0400 Subject: [PATCH 1/2] fix(bookstack-analytics): fix concurrent-write race condition and show unique users Replace the unsafe read-modify-write on the GCS activity log with a compare-and-swap loop using GCS object generation preconditions (if_generation_match). On an HTTP 412 conflict the logger reloads the latest log and retries up to 5 times, so simultaneous queries from different users no longer silently overwrite each other. Replace the misleading "Unique Sessions" dashboard metric (which was essentially equal to total queries since every page load creates a new session) with "Unique Users" computed from distinct user_email values. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../analytics/components/query-metrics.tsx | 6 +- .../ui/lib/bookstack-data-fetcher.ts | 2 + bookstack_agent/ui/lib/bookstack-types.ts | 1 + src/aieng_bot/bookstack/activity_logger.py | 95 ++++++++++++---- tests/bookstack/test_activity_logger.py | 107 ++++++++++++++++-- 5 files changed, 175 insertions(+), 36 deletions(-) diff --git a/bookstack_agent/ui/app/analytics/components/query-metrics.tsx b/bookstack_agent/ui/app/analytics/components/query-metrics.tsx index 8450897..6f91b62 100644 --- a/bookstack_agent/ui/app/analytics/components/query-metrics.tsx +++ b/bookstack_agent/ui/app/analytics/components/query-metrics.tsx @@ -56,9 +56,9 @@ export default function QueryMetrics({ metrics }: QueryMetricsProps) { accent="from-vector-violet to-vector-cobalt" /> } accent="from-vector-cobalt to-vector-violet" /> diff --git a/bookstack_agent/ui/lib/bookstack-data-fetcher.ts b/bookstack_agent/ui/lib/bookstack-data-fetcher.ts index 586db55..3cbfe3f 100644 --- a/bookstack_agent/ui/lib/bookstack-data-fetcher.ts +++ b/bookstack_agent/ui/lib/bookstack-data-fetcher.ts @@ -62,6 +62,7 @@ export function computeBookstackMetrics(log: BookstackActivityLog): BookstackMet const errors = activities.filter(a => a.status === 'error').length const sessions = new Set(activities.map(a => a.session_id)) + const users = new Set(activities.map(a => a.user_email).filter(Boolean)) const durations = activities.map(a => a.duration_seconds).filter(d => d > 0) const avgDuration = durations.length > 0 @@ -103,6 +104,7 @@ export function computeBookstackMetrics(log: BookstackActivityLog): BookstackMet error_queries: errors, success_rate: total > 0 ? successful / total : 0, unique_sessions: sessions.size, + unique_users: users.size, avg_duration_seconds: avgDuration, avg_tool_calls_per_query: avgToolCalls, total_tool_calls: totalToolCalls, diff --git a/bookstack_agent/ui/lib/bookstack-types.ts b/bookstack_agent/ui/lib/bookstack-types.ts index a5fbd2d..ecb73f7 100644 --- a/bookstack_agent/ui/lib/bookstack-types.ts +++ b/bookstack_agent/ui/lib/bookstack-types.ts @@ -54,6 +54,7 @@ export interface BookstackMetrics { error_queries: number success_rate: number unique_sessions: number + unique_users: number avg_duration_seconds: number avg_tool_calls_per_query: number total_tool_calls: number diff --git a/src/aieng_bot/bookstack/activity_logger.py b/src/aieng_bot/bookstack/activity_logger.py index dd800ff..da3bc7b 100644 --- a/src/aieng_bot/bookstack/activity_logger.py +++ b/src/aieng_bot/bookstack/activity_logger.py @@ -7,16 +7,25 @@ from datetime import datetime, timezone from typing import Any + +class _NeverRaisedError(Exception): + """Sentinel used when google-cloud-storage is not installed.""" + + try: + from google.api_core.exceptions import PreconditionFailed as _PreconditionFailed from google.cloud import storage as _gcs_storage _GCS_AVAILABLE = True except ImportError: - _gcs_storage = None + _PreconditionFailed = _NeverRaisedError # type: ignore[assignment,misc] + _gcs_storage = None # type: ignore[assignment] _GCS_AVAILABLE = False logger = logging.getLogger(__name__) +_MAX_CAS_RETRIES = 5 + BUCKET = "bot-dashboard-vectorinstitute" ACTIVITY_LOG_PATH = "data/bookstack_activity_log.json" TRACES_PREFIX = "data/bookstack/traces" @@ -66,15 +75,16 @@ def _get_client(self) -> Any: self._client = _gcs_storage.Client() return self._client - def _load_activity_log(self) -> dict[str, Any] | None: + def _load_activity_log(self) -> tuple[dict[str, Any], int] | None: """Download the current activity log from GCS. Returns ------- - dict - Parsed log with ``activities`` list and ``last_updated`` key. - Returns an empty structure if the log does not yet exist. - Returns ``None`` on any read error (caller must abort write). + tuple[dict, int] + ``(log_data, generation)`` where ``generation=0`` means the object + does not yet exist (use ``if_generation_match=0`` to create it + atomically). Returns ``None`` on any read error (caller must + abort the write to protect existing data). """ try: @@ -82,9 +92,10 @@ def _load_activity_log(self) -> dict[str, Any] | None: bucket = client.bucket(self.bucket) blob = bucket.blob(self.log_path) if not blob.exists(): - return {"activities": [], "last_updated": None} - data = blob.download_as_text() - return json.loads(data) + return {"activities": [], "last_updated": None}, 0 + raw = blob.download_as_text() + generation: int = blob.generation or 0 + return json.loads(raw), generation except json.JSONDecodeError as exc: logger.error("Failed to parse bookstack activity log: %s", exc) return None @@ -96,18 +107,30 @@ def _load_activity_log(self) -> dict[str, Any] | None: ) return None - def _save_activity_log(self, log_data: dict[str, Any]) -> bool: - """Upload the activity log to GCS. + def _save_activity_log( + self, log_data: dict[str, Any], if_generation_match: int + ) -> bool: + """Upload the activity log to GCS with optimistic concurrency control. Parameters ---------- log_data : dict Updated activity log to persist. + if_generation_match : int + GCS generation precondition. Pass ``0`` to require the object + not to exist yet; pass the value from ``_load_activity_log`` to + require it has not changed since the read. Returns ------- bool - ``True`` on success, ``False`` on failure. + ``True`` on success, ``False`` on a non-retryable error. + + Raises + ------ + _PreconditionFailed + When another writer modified the log between the read and this + write (HTTP 412). The caller should reload and retry. """ try: @@ -117,8 +140,11 @@ def _save_activity_log(self, log_data: dict[str, Any]) -> bool: blob.upload_from_string( json.dumps(log_data, indent=2), content_type="application/json", + if_generation_match=if_generation_match, ) return True + except _PreconditionFailed: + raise except Exception as exc: logger.error("Failed to upload bookstack activity log to GCS: %s", exc) return False @@ -257,19 +283,42 @@ def log_query( "trace_path": trace_path, } - log_data = self._load_activity_log() - if log_data is None: - logger.error( - "Aborting bookstack activity log write for session %s " - "to prevent overwriting existing data after a GCS read failure", - session_id[:8], - ) - return False + log_saved = False + for attempt in range(_MAX_CAS_RETRIES): + snapshot = self._load_activity_log() + if snapshot is None: + logger.error( + "Aborting bookstack activity log write for session %s " + "to prevent overwriting existing data after a GCS read failure", + session_id[:8], + ) + break - log_data["activities"].append(activity) - log_data["last_updated"] = timestamp + log_data, generation = snapshot + log_data["activities"].append(activity) + log_data["last_updated"] = timestamp - log_saved = self._save_activity_log(log_data) + try: + log_saved = self._save_activity_log( + log_data, if_generation_match=generation + ) + break + except _PreconditionFailed: + if attempt < _MAX_CAS_RETRIES - 1: + logger.debug( + "CAS conflict writing bookstack activity log, " + "retrying (%d/%d, session=%s)", + attempt + 1, + _MAX_CAS_RETRIES, + session_id[:8], + ) + else: + logger.error( + "Gave up writing bookstack activity log after %d CAS retries " + "(session=%s)", + _MAX_CAS_RETRIES, + session_id[:8], + ) if log_saved: logger.info( diff --git a/tests/bookstack/test_activity_logger.py b/tests/bookstack/test_activity_logger.py index cfc9371..1841401 100644 --- a/tests/bookstack/test_activity_logger.py +++ b/tests/bookstack/test_activity_logger.py @@ -6,7 +6,10 @@ import pytest -from aieng_bot.bookstack.activity_logger import BookstackActivityLogger +from aieng_bot.bookstack.activity_logger import ( + BookstackActivityLogger, + _PreconditionFailed, +) # --------------------------------------------------------------------------- # Fixtures @@ -101,24 +104,28 @@ def test_load_existing_log( mock_gcs_client: MagicMock, sample_log: dict[str, Any], ) -> None: - """Return parsed log when blob exists.""" + """Return (log_data, generation) when blob exists.""" blob = MagicMock() blob.exists.return_value = True blob.download_as_text.return_value = json.dumps(sample_log) + blob.generation = 42 mock_gcs_client.bucket.return_value.blob.return_value = blob _attach_mock_client(logger, mock_gcs_client) result = logger._load_activity_log() - assert result == sample_log - assert len(result["activities"]) == 1 # type: ignore[index] + assert result is not None + log_data, generation = result + assert log_data == sample_log + assert len(log_data["activities"]) == 1 + assert generation == 42 def test_load_returns_empty_when_blob_not_found( self, logger: BookstackActivityLogger, mock_gcs_client: MagicMock, ) -> None: - """Return empty structure when the log file does not yet exist.""" + """Return (empty structure, generation=0) when the log file does not yet exist.""" blob = MagicMock() blob.exists.return_value = False mock_gcs_client.bucket.return_value.blob.return_value = blob @@ -126,7 +133,10 @@ def test_load_returns_empty_when_blob_not_found( result = logger._load_activity_log() - assert result == {"activities": [], "last_updated": None} + assert result is not None + log_data, generation = result + assert log_data == {"activities": [], "last_updated": None} + assert generation == 0 def test_load_returns_none_on_gcs_error( self, @@ -172,17 +182,18 @@ def test_save_success( mock_gcs_client: MagicMock, sample_log: dict[str, Any], ) -> None: - """Return True and upload JSON with correct content-type on success.""" + """Return True, upload JSON with correct content-type and generation precondition.""" blob = MagicMock() mock_gcs_client.bucket.return_value.blob.return_value = blob _attach_mock_client(logger, mock_gcs_client) - result = logger._save_activity_log(sample_log) + result = logger._save_activity_log(sample_log, if_generation_match=99) assert result is True blob.upload_from_string.assert_called_once() call_kwargs = blob.upload_from_string.call_args assert call_kwargs.kwargs["content_type"] == "application/json" + assert call_kwargs.kwargs["if_generation_match"] == 99 saved = json.loads(call_kwargs.args[0]) assert saved == sample_log @@ -192,16 +203,31 @@ def test_save_returns_false_on_gcs_error( mock_gcs_client: MagicMock, sample_log: dict[str, Any], ) -> None: - """Return False when the GCS upload raises.""" + """Return False when the GCS upload raises a non-CAS error.""" blob = MagicMock() blob.upload_from_string.side_effect = Exception("upload failed") mock_gcs_client.bucket.return_value.blob.return_value = blob _attach_mock_client(logger, mock_gcs_client) - result = logger._save_activity_log(sample_log) + result = logger._save_activity_log(sample_log, if_generation_match=0) assert result is False + def test_save_raises_on_precondition_failed( + self, + logger: BookstackActivityLogger, + mock_gcs_client: MagicMock, + sample_log: dict[str, Any], + ) -> None: + """Re-raise PreconditionFailed so the caller can retry.""" + blob = MagicMock() + blob.upload_from_string.side_effect = _PreconditionFailed("conflict") + mock_gcs_client.bucket.return_value.blob.return_value = blob + _attach_mock_client(logger, mock_gcs_client) + + with pytest.raises(_PreconditionFailed): + logger._save_activity_log(sample_log, if_generation_match=5) + # --------------------------------------------------------------------------- # _save_trace @@ -359,6 +385,7 @@ def test_log_query_appends_to_existing_log( blob = MagicMock() blob.exists.return_value = True blob.download_as_text.return_value = json.dumps(sample_log) + blob.generation = 77 mock_gcs_client.bucket.return_value.blob.return_value = blob _attach_mock_client(logger, mock_gcs_client) @@ -378,6 +405,7 @@ def test_log_query_appends_to_existing_log( assert len(saved["activities"]) == 2 assert saved["activities"][0]["session_id"] == "existing01" assert saved["activities"][1]["session_id"] == "newentry" + assert spy.call_args.kwargs["if_generation_match"] == 77 def test_log_query_returns_false_when_load_fails( self, @@ -453,3 +481,62 @@ def test_log_query_deduplicates_tools_used( # tools_used should list each tool only once assert saved["activities"][0]["tools_used"] == ["search_bookstack"] assert saved["activities"][0]["num_tool_calls"] == 3 + + def test_log_query_retries_on_cas_conflict( + self, + logger: BookstackActivityLogger, + ) -> None: + """Retry the append+save loop on a 412 PreconditionFailed and succeed on the second attempt.""" + save_attempts: list[int] = [] + + def mock_save(_log_data: dict[str, Any], *, if_generation_match: int) -> bool: + save_attempts.append(if_generation_match) + if len(save_attempts) == 1: + raise _PreconditionFailed("concurrent write") + return True + + empty_snapshot = ({"activities": [], "last_updated": None}, 0) + + with ( + patch.object(logger, "_save_trace", return_value=True), + patch.object(logger, "_load_activity_log", return_value=empty_snapshot), + patch.object(logger, "_save_activity_log", side_effect=mock_save), + ): + result = logger.log_query( + session_id="sess_cas", + question="Will this retry?", + tool_calls=[], + answer="Yes", + duration_seconds=1.0, + status="success", + ) + + assert result is True + assert len(save_attempts) == 2 # failed once, succeeded on retry + + def test_log_query_gives_up_after_max_cas_retries( + self, + logger: BookstackActivityLogger, + ) -> None: + """Return False when all CAS retry attempts are exhausted.""" + empty_snapshot = ({"activities": [], "last_updated": None}, 0) + + with ( + patch.object(logger, "_save_trace", return_value=True), + patch.object(logger, "_load_activity_log", return_value=empty_snapshot), + patch.object( + logger, + "_save_activity_log", + side_effect=_PreconditionFailed("persistent conflict"), + ), + ): + result = logger.log_query( + session_id="sess_exhaust", + question="Will this exhaust retries?", + tool_calls=[], + answer="No", + duration_seconds=1.0, + status="success", + ) + + assert result is False From 58b590614f4ba01c17fa83617c313d95b27ae165 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 16 Jun 2026 21:52:32 -0400 Subject: [PATCH 2/2] fix: remove redundant type: ignore comments flagged by mypy warn-unused-ignores --- src/aieng_bot/bookstack/activity_logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aieng_bot/bookstack/activity_logger.py b/src/aieng_bot/bookstack/activity_logger.py index da3bc7b..c599693 100644 --- a/src/aieng_bot/bookstack/activity_logger.py +++ b/src/aieng_bot/bookstack/activity_logger.py @@ -18,8 +18,8 @@ class _NeverRaisedError(Exception): _GCS_AVAILABLE = True except ImportError: - _PreconditionFailed = _NeverRaisedError # type: ignore[assignment,misc] - _gcs_storage = None # type: ignore[assignment] + _PreconditionFailed = _NeverRaisedError + _gcs_storage = None _GCS_AVAILABLE = False logger = logging.getLogger(__name__)