diff --git a/apps/api/alembic/versions/f9c0d1e2f3a4_add_retrieval_steps_created_index.py b/apps/api/alembic/versions/f9c0d1e2f3a4_add_retrieval_steps_created_index.py new file mode 100644 index 00000000..87c8ea2d --- /dev/null +++ b/apps/api/alembic/versions/f9c0d1e2f3a4_add_retrieval_steps_created_index.py @@ -0,0 +1,32 @@ +"""Add retrieval step timestamp index. + +Revision ID: f9c0d1e2f3a4 +Revises: f9b0c1d2e3f4 +Create Date: 2026-06-12 08:35:00.000000 +""" + +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "f9c0d1e2f3a4" +down_revision: Union[str, Sequence[str], None] = "f9b0c1d2e3f4" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_index( + "idx_retrieval_steps_created", + "retrieval_steps", + ["created_at"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index("idx_retrieval_steps_created", table_name="retrieval_steps") diff --git a/apps/api/app/core/middleware/telemetry.py b/apps/api/app/core/middleware/telemetry.py new file mode 100644 index 00000000..411adffa --- /dev/null +++ b/apps/api/app/core/middleware/telemetry.py @@ -0,0 +1,37 @@ +"""Anonymous aggregate telemetry middleware.""" + +from __future__ import annotations + +import time + +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +from shared.services.telemetry.api_metrics import ApiRequestTelemetryMetrics + + +class ApiTelemetryMiddleware: + """Record bounded API request metrics without request payloads or raw paths.""" + + def __init__(self, app: ASGIApp, *, metrics: ApiRequestTelemetryMetrics) -> None: + self.app = app + self.metrics = metrics + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + status_code = 500 + started_at = time.perf_counter() + + async def send_wrapper(message: Message) -> None: + nonlocal status_code + if message["type"] == "http.response.start": + status_code = int(message.get("status", 500)) + await send(message) + + try: + await self.app(scope, receive, send_wrapper) + finally: + elapsed_ms = (time.perf_counter() - started_at) * 1000 + self.metrics.record(status_code=status_code, latency_ms=elapsed_ms) diff --git a/apps/api/main.py b/apps/api/main.py index 68eb223e..7140fab0 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -17,9 +17,11 @@ from contextlib import asynccontextmanager from app.api.api_router import api_router from app.core.middleware import setup_cors, LoggingMiddleware +from app.core.middleware.telemetry import ApiTelemetryMiddleware from app.core.exception_handlers import setup_exception_handlers from app.mcp import create_retrieval_mcp_server from app.services.rate_limit.rule_loader import load_rules +from shared.services.telemetry.api_metrics import ApiRequestTelemetryMetrics @asynccontextmanager @@ -68,15 +70,35 @@ async def lifespan(app: FastAPI): await load_rules(session) logger.info("rate limit rules loaded at startup; restart the pod to apply changes") + from shared.services.telemetry.aggregates import ( + start_self_hosted_aggregate_telemetry, + ) from shared.services.telemetry.runtime import start_self_hosted_telemetry - app.state.self_hosted_telemetry_client = await start_self_hosted_telemetry( + telemetry_runtime = await start_self_hosted_telemetry( settings, service_name="knowhere-api", api_healthy=True, postgres_healthy=True, redis_healthy=True, ) + if telemetry_runtime is None: + app.state.self_hosted_telemetry_client = None + app.state.self_hosted_telemetry_config = None + app.state.self_hosted_aggregate_telemetry_runner = None + else: + telemetry_client, telemetry_config = telemetry_runtime + app.state.self_hosted_telemetry_client = telemetry_client + app.state.self_hosted_telemetry_config = telemetry_config + app.state.self_hosted_aggregate_telemetry_runner = ( + await start_self_hosted_aggregate_telemetry( + settings, + telemetry_client=telemetry_client, + config=telemetry_config, + db_session_factory=get_db_context, + api_metrics=app.state.self_hosted_api_telemetry_metrics, + ) + ) mcp_server = getattr(app.state, "retrieval_mcp_server", None) mcp_session_manager = getattr(mcp_server, "session_manager", None) @@ -89,8 +111,14 @@ async def lifespan(app: FastAPI): yield try: + from shared.services.telemetry.aggregates import ( + stop_self_hosted_aggregate_telemetry, + ) from shared.services.telemetry.runtime import stop_self_hosted_telemetry + await stop_self_hosted_aggregate_telemetry( + getattr(app.state, "self_hosted_aggregate_telemetry_runner", None) + ) await stop_self_hosted_telemetry( getattr(app.state, "self_hosted_telemetry_client", None) ) @@ -150,6 +178,9 @@ def create_app() -> FastAPI: # Setup middleware setup_cors(app) + api_telemetry_metrics = ApiRequestTelemetryMetrics() + app.state.self_hosted_api_telemetry_metrics = api_telemetry_metrics + app.add_middleware(ApiTelemetryMiddleware, metrics=api_telemetry_metrics) app.add_middleware(LoggingMiddleware) @app.get("/", tags=["Root"]) diff --git a/apps/api/tests/contract/test_self_hosted_telemetry_contract.py b/apps/api/tests/contract/test_self_hosted_telemetry_contract.py index 0c2e5630..8c3abaf7 100644 --- a/apps/api/tests/contract/test_self_hosted_telemetry_contract.py +++ b/apps/api/tests/contract/test_self_hosted_telemetry_contract.py @@ -3,15 +3,28 @@ from __future__ import annotations import asyncio +from contextlib import AbstractAsyncContextManager from dataclasses import dataclass, replace from pathlib import Path +from types import TracebackType +from typing import Any, cast -import httpx import pytest +from sqlalchemy.ext.asyncio import AsyncSession from shared.services.telemetry.client import TelemetryClient from shared.services.telemetry.config import TelemetryRuntimeConfig -from shared.services.telemetry.events import sanitize_event_properties +from shared.services.telemetry.api_metrics import ApiRequestTelemetryMetrics +from shared.services.telemetry.aggregates import ( + TelemetryAggregateSettings, + collect_self_hosted_aggregate_event_properties, + start_self_hosted_aggregate_telemetry, + stop_self_hosted_aggregate_telemetry, +) +from shared.services.telemetry.events import ( + get_allowed_telemetry_event_names, + sanitize_event_properties, +) from shared.services.telemetry.identity import get_or_create_installation_id @@ -75,21 +88,125 @@ def test_telemetry_properties_strip_unknown_and_non_scalar_values() -> None: } +def test_aggregate_event_names_are_allowed() -> None: + assert { + "self_hosted_usage_aggregate", + "self_hosted_retrieval_aggregate", + "self_hosted_worker_aggregate", + "self_hosted_api_aggregate", + "self_hosted_provider_aggregate", + }.issubset(get_allowed_telemetry_event_names()) + + +def test_aggregate_properties_strip_sensitive_values() -> None: + properties = sanitize_event_properties( + "self_hosted_usage_aggregate", + { + "app_version": "1.2.3", + "window_seconds": 86_400, + "total_jobs": 3, + "total_users": 2, + "email": "user@example.com", + "document_name": "private.pdf", + "query": "private retrieval query", + }, + ) + + assert properties == { + "app_version": "1.2.3", + "window_seconds": 86_400, + "total_jobs": 3, + "total_users": 2, + } + + +def test_api_request_metrics_snapshot_resets_bucket() -> None: + metrics = ApiRequestTelemetryMetrics() + metrics.record(status_code=200, latency_ms=10) + metrics.record(status_code=404, latency_ms=20) + metrics.record(status_code=500, latency_ms=30) + + snapshot = metrics.snapshot_and_reset() + empty_snapshot = metrics.snapshot_and_reset() + + assert snapshot.request_count == 3 + assert snapshot.status_2xx_count == 1 + assert snapshot.status_4xx_count == 1 + assert snapshot.status_5xx_count == 1 + assert snapshot.latency_avg_ms == 20 + assert empty_snapshot.request_count == 0 + + @pytest.mark.asyncio -async def test_telemetry_client_sends_anonymous_posthog_batch( +async def test_api_aggregate_uses_interval_window_when_global_lock_unavailable( + tmp_path: Path, +) -> None: + metrics = ApiRequestTelemetryMetrics() + metrics.record(status_code=200, latency_ms=12) + + properties = await collect_self_hosted_aggregate_event_properties( + config=_build_config(tmp_path), + db_session_factory=_build_lock_unavailable_session, + api_metrics=metrics, + window_seconds=86_400, + api_window_seconds=300, + ) + + assert set(properties) == {"self_hosted_api_aggregate"} + api_properties = properties["self_hosted_api_aggregate"] + assert api_properties["window_seconds"] == 300 + assert api_properties["api_requests_total"] == 1 + assert api_properties["api_requests_2xx"] == 1 + + +def test_telemetry_client_filters_posthog_sdk_properties_after_capture( tmp_path: Path, ) -> None: - sent_requests: list[str] = [] + telemetry_client = TelemetryClient(_build_config(tmp_path)) - async def handler(request: httpx.Request) -> httpx.Response: - sent_requests.append(request.read().decode("utf-8")) - return httpx.Response(status_code=200, json={"status": "ok"}) + sanitized_message = telemetry_client._sanitize_posthog_message( + { + "event": "self_hosted_api_aggregate", + "properties": { + "app_version": "1.2.3", + "api_requests_total": 1, + "email": "user@example.com", + "$context_tags": ["private-context"], + "$geoip_disable": True, + "$is_server": True, + "$lib": "posthog-python", + "$lib_version": "7.18.1", + "$os": "Linux", + "$os_version": "20.04", + "$python_runtime": "CPython", + "$python_version": "3.14.4", + }, + } + ) + + assert sanitized_message is not None + assert sanitized_message["properties"] == { + "app_version": "1.2.3", + "api_requests_total": 1, + "$geoip_disable": True, + "$is_server": True, + "$lib": "posthog-python", + "$lib_version": "7.18.1", + "$os": "Linux", + "$os_version": "20.04", + "$python_runtime": "CPython", + "$python_version": "3.14.4", + } - transport = httpx.MockTransport(handler) - http_client = httpx.AsyncClient(transport=transport) + +@pytest.mark.asyncio +async def test_telemetry_client_sends_anonymous_posthog_capture( + tmp_path: Path, +) -> None: + posthog_client = _FakePostHogClient() telemetry_client = TelemetryClient( _build_config(tmp_path), - http_client=http_client, + posthog_client=posthog_client, ) await telemetry_client.start() @@ -104,29 +221,87 @@ async def handler(request: httpx.Request) -> httpx.Response: await telemetry_client.stop() assert queued is True - assert len(sent_requests) == 1 - assert "phc_test_project_key" in str(sent_requests[0]) - assert "550e8400-e29b-41d4-a716-446655440000" in str(sent_requests[0]) - assert "$process_person_profile" in str(sent_requests[0]) - assert "private prompt" not in str(sent_requests[0]) + assert len(posthog_client.captured_events) == 1 + captured_event = posthog_client.captured_events[0] + assert captured_event.event_name == "self_hosted_instance_heartbeat" + assert captured_event.kwargs["distinct_id"] == ( + "550e8400-e29b-41d4-a716-446655440000" + ) + assert captured_event.kwargs["disable_geoip"] is True + assert captured_event.kwargs["properties"] == { + "app_version": "1.2.3", + "api_healthy": True, + "$process_person_profile": False, + } + assert posthog_client.flush_count == 1 + + +@pytest.mark.asyncio +async def test_telemetry_client_sends_aggregate_events(tmp_path: Path) -> None: + posthog_client = _FakePostHogClient() + telemetry_client = TelemetryClient( + _build_config(tmp_path), + posthog_client=posthog_client, + ) + + await telemetry_client.start() + queued = telemetry_client.capture( + "self_hosted_api_aggregate", + { + "app_version": "1.2.3", + "window_seconds": 86_400, + "api_requests_total": 7, + "api_requests_2xx": 6, + "prompt": "private prompt", + }, + ) + await telemetry_client.stop() - await http_client.aclose() + assert queued is True + assert len(posthog_client.captured_events) == 1 + captured_event = posthog_client.captured_events[0] + assert captured_event.event_name == "self_hosted_api_aggregate" + assert captured_event.kwargs["properties"] == { + "app_version": "1.2.3", + "window_seconds": 86_400, + "api_requests_total": 7, + "api_requests_2xx": 6, + "$process_person_profile": False, + } @pytest.mark.asyncio -async def test_telemetry_client_respects_batch_size(tmp_path: Path) -> None: - sent_requests: list[str] = [] +async def test_aggregate_telemetry_start_does_not_require_successful_snapshot( + tmp_path: Path, +) -> None: + posthog_client = _FakePostHogClient() + telemetry_client = TelemetryClient( + _build_config(tmp_path), + posthog_client=posthog_client, + ) + + await telemetry_client.start() + runner = await start_self_hosted_aggregate_telemetry( + cast(TelemetryAggregateSettings, _AggregateSettings()), + telemetry_client=telemetry_client, + config=_build_config(tmp_path), + db_session_factory=_build_failing_session, + api_metrics=ApiRequestTelemetryMetrics(), + ) + await stop_self_hosted_aggregate_telemetry(runner) + await telemetry_client.stop() - async def handler(request: httpx.Request) -> httpx.Response: - sent_requests.append(request.read().decode("utf-8")) - return httpx.Response(status_code=200, json={"status": "ok"}) + assert runner is not None + assert posthog_client.captured_events == [] - transport = httpx.MockTransport(handler) - http_client = httpx.AsyncClient(transport=transport) + +@pytest.mark.asyncio +async def test_telemetry_client_respects_batch_size(tmp_path: Path) -> None: + posthog_client = _FakePostHogClient() config = _build_config(tmp_path) telemetry_client = TelemetryClient( replace(config, batch_size=2), - http_client=http_client, + posthog_client=posthog_client, ) await telemetry_client.start() @@ -139,23 +314,22 @@ async def handler(request: httpx.Request) -> httpx.Response: ) await telemetry_client.stop() - assert len(sent_requests) == 2 - - await http_client.aclose() + assert [event.event_name for event in posthog_client.captured_events] == [ + "self_hosted_instance_heartbeat", + "self_hosted_instance_heartbeat", + "self_hosted_instance_heartbeat", + ] + assert posthog_client.flush_count == 1 @pytest.mark.asyncio async def test_telemetry_client_flush_before_start_does_not_deadlock( tmp_path: Path, ) -> None: - async def handler(request: httpx.Request) -> httpx.Response: - return httpx.Response(status_code=200, json={"status": "ok"}) - - transport = httpx.MockTransport(handler) - http_client = httpx.AsyncClient(transport=transport) + posthog_client = _FakePostHogClient() telemetry_client = TelemetryClient( _build_config(tmp_path), - http_client=http_client, + posthog_client=posthog_client, ) telemetry_client.capture( @@ -174,22 +348,15 @@ async def handler(request: httpx.Request) -> httpx.Response: ) await asyncio.wait_for(telemetry_client.stop(), timeout=1.0) - await http_client.aclose() + assert len(posthog_client.captured_events) == 2 @pytest.mark.asyncio async def test_telemetry_client_does_not_restart_after_stop(tmp_path: Path) -> None: - sent_requests: list[str] = [] - - async def handler(request: httpx.Request) -> httpx.Response: - sent_requests.append(request.read().decode("utf-8")) - return httpx.Response(status_code=200, json={"status": "ok"}) - - transport = httpx.MockTransport(handler) - http_client = httpx.AsyncClient(transport=transport) + posthog_client = _FakePostHogClient() telemetry_client = TelemetryClient( _build_config(tmp_path), - http_client=http_client, + posthog_client=posthog_client, ) await telemetry_client.start() @@ -208,11 +375,9 @@ async def handler(request: httpx.Request) -> httpx.Response: }, ) - assert len(sent_requests) == 1 + assert len(posthog_client.captured_events) == 1 assert queued_after_stop is False - await http_client.aclose() - @dataclass(frozen=True) class _ConfigOverrides: @@ -220,6 +385,89 @@ class _ConfigOverrides: posthog_project_key: str = "phc_test_project_key" +@dataclass(frozen=True) +class _CapturedPostHogEvent: + event_name: str + kwargs: dict[str, object] + + +@dataclass(frozen=True) +class _AggregateSettings: + TELEMETRY_AGGREGATE_INTERVAL_SECONDS: int = 60 + + +class _FailingSessionContext(AbstractAsyncContextManager[AsyncSession]): + async def __aenter__(self) -> AsyncSession: + raise RuntimeError("database unavailable") + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> bool | None: + return None + + +@dataclass(frozen=True) +class _ScalarResult: + value: object + + def scalar_one_or_none(self) -> object: + return self.value + + +class _LockUnavailableSession: + async def execute( + self, + statement: object, + params: dict[str, Any] | None = None, + ) -> _ScalarResult: + assert "pg_try_advisory_lock" in str(statement) + assert params is not None + return _ScalarResult(False) + + +class _LockUnavailableSessionContext(AbstractAsyncContextManager[AsyncSession]): + async def __aenter__(self) -> AsyncSession: + return cast(AsyncSession, _LockUnavailableSession()) + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> bool | None: + return None + + +class _FakePostHogClient: + def __init__(self) -> None: + self.captured_events: list[_CapturedPostHogEvent] = [] + self.flush_count = 0 + self.shutdown_count = 0 + + def capture(self, event: str, **kwargs: object) -> str: + self.captured_events.append( + _CapturedPostHogEvent(event_name=event, kwargs=kwargs) + ) + return "fake-posthog-event-id" + + def flush(self) -> None: + self.flush_count += 1 + + def shutdown(self) -> None: + self.shutdown_count += 1 + + +def _build_failing_session() -> AbstractAsyncContextManager[AsyncSession]: + return _FailingSessionContext() + + +def _build_lock_unavailable_session() -> AbstractAsyncContextManager[AsyncSession]: + return _LockUnavailableSessionContext() + + def _build_config( tmp_path: Path, overrides: _ConfigOverrides = _ConfigOverrides(), diff --git a/packages/shared-python/pyproject.toml b/packages/shared-python/pyproject.toml index 95cc081c..27879bc3 100644 --- a/packages/shared-python/pyproject.toml +++ b/packages/shared-python/pyproject.toml @@ -16,10 +16,10 @@ dependencies = [ "alembic==1.13.1", "sqlalchemy-utils==0.41.1", "pgvector==0.2.4", - + # Redis "redis==5.3.1", - + # Celery task queue "celery==5.4.0", "kombu==5.4.0", @@ -32,29 +32,32 @@ dependencies = [ "pydantic-settings==2.14.1", "python-dotenv==1.2.2", "pydantic[email]==2.13.4", - + # Auth and user model support "fastapi-users[sqlalchemy]==15.0.5", "authlib==1.6.11", "email-validator==2.2.0", - + # HTTP clients "httpx==0.28.1", "requests==2.33.0", "aiohttp==3.13.4", "qstash==3.2.0", + # Product telemetry + "posthog==7.18.1", + # OpenAI SDK "openai>=1.0.0", - + # Cloud storage "boto3==1.38.46", "botocore==1.38.46", - + # Logging "loguru==0.7.3", "psutil==5.9.8", - + # Utilities "pytz==2025.2", "PyYAML==6.0.2", diff --git a/packages/shared-python/shared/core/config/base.py b/packages/shared-python/shared/core/config/base.py index 8c0e9571..746e91e4 100644 --- a/packages/shared-python/shared/core/config/base.py +++ b/packages/shared-python/shared/core/config/base.py @@ -65,6 +65,10 @@ class BaseConfig(BaseSettings): default="self_hosted", description="Deployment-mode label attached to anonymous telemetry events", ) + TELEMETRY_AGGREGATE_INTERVAL_SECONDS: int = Field( + default=300, + description="Interval for anonymous self-hosted aggregate telemetry snapshots", + ) # Security configuration. WEBHOOK_MASTER_KEY: str = Field( @@ -131,6 +135,14 @@ def validate_telemetry_request_timeout_seconds(cls, v): raise ValueError("TELEMETRY_REQUEST_TIMEOUT_SECONDS must be positive") return v + @field_validator("TELEMETRY_AGGREGATE_INTERVAL_SECONDS") + @classmethod + def validate_telemetry_aggregate_interval_seconds(cls, v): + """Validate anonymous aggregate telemetry interval.""" + if v < 60: + raise ValueError("TELEMETRY_AGGREGATE_INTERVAL_SECONDS must be at least 60") + return v + def validate_file_paths(self) -> bool: """Validate required local file paths.""" paths_to_check = { diff --git a/packages/shared-python/shared/models/database/document.py b/packages/shared-python/shared/models/database/document.py index dcf3a982..eb73cc84 100644 --- a/packages/shared-python/shared/models/database/document.py +++ b/packages/shared-python/shared/models/database/document.py @@ -418,4 +418,5 @@ class RetrievalStep(Base): __table_args__ = ( Index('idx_retrieval_steps_run', 'run_id', 'step_index'), + Index('idx_retrieval_steps_created', 'created_at'), ) diff --git a/packages/shared-python/shared/services/telemetry/aggregates.py b/packages/shared-python/shared/services/telemetry/aggregates.py new file mode 100644 index 00000000..ddb5af9b --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/aggregates.py @@ -0,0 +1,573 @@ +"""DB-backed self-hosted aggregate telemetry snapshots.""" + +from __future__ import annotations + +import asyncio +from contextlib import AbstractAsyncContextManager +from typing import Any, Protocol, cast + +from loguru import logger +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from .api_metrics import ApiRequestTelemetryMetrics, ApiRequestMetricsSnapshot +from .client import TelemetryClient +from .config import TelemetryRuntimeConfig, TelemetrySettings +from .events import TelemetryProperties, build_base_event_properties + +AGGREGATE_WINDOW_SECONDS = 24 * 60 * 60 +AGGREGATE_ADVISORY_LOCK_ID = 0x4B4E4F5748455245 + + +class DatabaseSessionFactory(Protocol): + """Factory for app-owned async database sessions.""" + + def __call__(self) -> AbstractAsyncContextManager[AsyncSession]: + """Return an async context manager yielding an AsyncSession.""" + raise NotImplementedError + + +class TelemetryAggregateSettings(TelemetrySettings, Protocol): + TELEMETRY_AGGREGATE_INTERVAL_SECONDS: int + + +class SelfHostedAggregateTelemetryRunner: + """Periodically emits anonymous, aggregate-only self-hosted telemetry.""" + + def __init__( + self, + *, + config: TelemetryRuntimeConfig, + telemetry_client: TelemetryClient, + db_session_factory: DatabaseSessionFactory, + api_metrics: ApiRequestTelemetryMetrics, + interval_seconds: int, + ) -> None: + self._config = config + self._telemetry_client = telemetry_client + self._db_session_factory = db_session_factory + self._api_metrics = api_metrics + self._interval_seconds = interval_seconds + self._task: asyncio.Task[None] | None = None + + async def emit_once(self) -> None: + """Collect and emit all aggregate snapshots once.""" + event_properties = await collect_self_hosted_aggregate_event_properties( + config=self._config, + db_session_factory=self._db_session_factory, + api_metrics=self._api_metrics, + window_seconds=AGGREGATE_WINDOW_SECONDS, + api_window_seconds=self._interval_seconds, + ) + for event_name, properties in event_properties.items(): + self._telemetry_client.capture(event_name, properties) + + def start(self) -> None: + """Start the periodic aggregate loop.""" + if self._task is not None: + return + self._task = asyncio.create_task( + self._run(), + name="self-hosted-aggregate-telemetry", + ) + + async def stop(self) -> None: + """Stop the periodic aggregate loop.""" + task = self._task + if task is None: + return + task.cancel() + await asyncio.gather(task, return_exceptions=True) + self._task = None + + async def _run(self) -> None: + while True: + await asyncio.sleep(self._interval_seconds) + try: + await self.emit_once() + except Exception as exc: + logger.warning(f"anonymous aggregate telemetry failed: {exc}") + + +async def start_self_hosted_aggregate_telemetry( + settings: TelemetryAggregateSettings, + *, + telemetry_client: TelemetryClient | None, + config: TelemetryRuntimeConfig | None, + db_session_factory: DatabaseSessionFactory, + api_metrics: ApiRequestTelemetryMetrics, +) -> SelfHostedAggregateTelemetryRunner | None: + """Start aggregate telemetry when the base self-hosted client is active.""" + if telemetry_client is None or config is None: + return None + interval_seconds = max(settings.TELEMETRY_AGGREGATE_INTERVAL_SECONDS, 60) + runner = SelfHostedAggregateTelemetryRunner( + config=config, + telemetry_client=telemetry_client, + db_session_factory=db_session_factory, + api_metrics=api_metrics, + interval_seconds=interval_seconds, + ) + try: + runner.start() + except Exception as exc: + logger.warning(f"anonymous aggregate telemetry start failed: {exc}") + return None + logger.info("anonymous self-hosted aggregate telemetry scheduled") + return runner + + +async def stop_self_hosted_aggregate_telemetry( + runner: SelfHostedAggregateTelemetryRunner | None, +) -> None: + """Stop aggregate telemetry if it was started.""" + if runner is None: + return + await runner.stop() + + +async def collect_self_hosted_aggregate_event_properties( + *, + config: TelemetryRuntimeConfig, + db_session_factory: DatabaseSessionFactory, + api_metrics: ApiRequestTelemetryMetrics, + window_seconds: int = AGGREGATE_WINDOW_SECONDS, + api_window_seconds: int = AGGREGATE_WINDOW_SECONDS, +) -> dict[str, TelemetryProperties]: + """Collect aggregate event properties without including customer content.""" + event_properties = { + "self_hosted_api_aggregate": _collect_api_aggregate( + config, + api_metrics.snapshot_and_reset(), + api_window_seconds, + ), + } + async with db_session_factory() as session: + lock_acquired = await _try_aggregate_advisory_lock(session) + if not lock_acquired: + return event_properties + try: + event_properties.update( + { + "self_hosted_usage_aggregate": await _collect_usage_aggregate( + session, + config, + window_seconds, + ), + "self_hosted_retrieval_aggregate": await _collect_retrieval_aggregate( + session, + config, + window_seconds, + ), + "self_hosted_worker_aggregate": await _collect_worker_aggregate( + session, + config, + window_seconds, + ), + "self_hosted_provider_aggregate": await _collect_provider_aggregate( + session, + config, + window_seconds, + ), + } + ) + return event_properties + finally: + await _release_aggregate_advisory_lock(session) + + +async def _collect_usage_aggregate( + session: AsyncSession, + config: TelemetryRuntimeConfig, + window_seconds: int, +) -> TelemetryProperties: + properties = _base_aggregate_properties(config, window_seconds) + properties.update( + { + "total_users": await _int_scalar(session, 'SELECT COUNT(*) FROM "user"'), + "active_api_keys": await _int_scalar( + session, + "SELECT COUNT(*) FROM api_keys WHERE is_active = true", + ), + "total_jobs": await _int_scalar(session, "SELECT COUNT(*) FROM jobs"), + "jobs_created_24h": await _int_scalar( + session, + _windowed_count_sql("jobs", "created_at"), + window_seconds, + ), + "active_jobs": await _int_scalar( + session, + "SELECT COUNT(*) FROM jobs WHERE status IN ('waiting-file', 'pending', 'running', 'converting')", + ), + "completed_jobs_24h": await _int_scalar( + session, + _windowed_count_sql("jobs", "updated_at", "status = 'done'"), + window_seconds, + ), + "failed_jobs_24h": await _int_scalar( + session, + _windowed_count_sql("jobs", "updated_at", "status = 'failed'"), + window_seconds, + ), + "total_documents": await _int_scalar( + session, + "SELECT COUNT(*) FROM documents", + ), + "active_documents": await _int_scalar( + session, + "SELECT COUNT(*) FROM documents WHERE status = 'active'", + ), + "total_document_chunks": await _int_scalar( + session, + "SELECT COUNT(*) FROM document_chunks", + ), + "total_job_chunks": await _int_scalar( + session, + "SELECT COUNT(*) FROM job_chunks", + ), + "pages_processed_24h": await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(page_count), 0) + FROM jobs + WHERE updated_at >= {_window_start_expression()} + AND status = 'done' + """, + window_seconds, + ), + "credits_charged_24h": await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(credits_charged), 0) + FROM jobs + WHERE updated_at >= {_window_start_expression()} + """, + window_seconds, + ), + } + ) + return properties + + +async def _collect_retrieval_aggregate( + session: AsyncSession, + config: TelemetryRuntimeConfig, + window_seconds: int, +) -> TelemetryProperties: + properties = _base_aggregate_properties(config, window_seconds) + properties.update( + { + "retrieval_runs_24h": await _int_scalar( + session, + _windowed_count_sql("retrieval_runs", "created_at"), + window_seconds, + ), + "retrieval_errors_24h": await _int_scalar( + session, + _windowed_count_sql( + "retrieval_runs", + "created_at", + "error IS NOT NULL AND error <> ''", + ), + window_seconds, + ), + "retrieval_cache_hits_24h": await _int_scalar( + session, + _windowed_count_sql( + "retrieval_runs", + "created_at", + "cache_hit = true", + ), + window_seconds, + ), + "retrieval_result_count_24h": await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(result_count), 0) + FROM retrieval_runs + WHERE created_at >= {_window_start_expression()} + """, + window_seconds, + ), + "retrieval_latency_avg_ms_24h": await _float_scalar( + session, + f""" + SELECT COALESCE(AVG(latency_ms), 0) + FROM retrieval_runs + WHERE created_at >= {_window_start_expression()} + """, + window_seconds, + ), + "retrieval_latency_p95_ms_24h": await _float_scalar( + session, + f""" + SELECT COALESCE(percentile_cont(0.95) WITHIN GROUP (ORDER BY latency_ms), 0) + FROM retrieval_runs + WHERE created_at >= {_window_start_expression()} + """, + window_seconds, + ), + "retrieval_tokens_24h": await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(token_count), 0) + FROM retrieval_runs + WHERE created_at >= {_window_start_expression()} + """, + window_seconds, + ), + "retrieval_steps_24h": await _int_scalar( + session, + _windowed_count_sql("retrieval_steps", "created_at"), + window_seconds, + ), + "retrieval_step_errors_24h": await _int_scalar( + session, + _windowed_count_sql( + "retrieval_steps", + "created_at", + "error IS NOT NULL AND error <> ''", + ), + window_seconds, + ), + } + ) + return properties + + +async def _collect_worker_aggregate( + session: AsyncSession, + config: TelemetryRuntimeConfig, + window_seconds: int, +) -> TelemetryProperties: + properties = _base_aggregate_properties(config, window_seconds) + properties.update( + { + "jobs_waiting_file": await _int_scalar( + session, + "SELECT COUNT(*) FROM jobs WHERE status = 'waiting-file'", + ), + "jobs_pending": await _int_scalar( + session, + "SELECT COUNT(*) FROM jobs WHERE status = 'pending'", + ), + "jobs_running": await _int_scalar( + session, + "SELECT COUNT(*) FROM jobs WHERE status = 'running'", + ), + "jobs_converting": await _int_scalar( + session, + "SELECT COUNT(*) FROM jobs WHERE status = 'converting'", + ), + "jobs_done_24h": await _int_scalar( + session, + _windowed_count_sql("jobs", "updated_at", "status = 'done'"), + window_seconds, + ), + "jobs_failed_24h": await _int_scalar( + session, + _windowed_count_sql("jobs", "updated_at", "status = 'failed'"), + window_seconds, + ), + "job_duration_avg_seconds_24h": await _float_scalar( + session, + f""" + SELECT COALESCE(AVG(EXTRACT(EPOCH FROM (updated_at - created_at))), 0) + FROM jobs + WHERE updated_at >= {_window_start_expression()} + AND status IN ('done', 'failed') + """, + window_seconds, + ), + } + ) + return properties + + +def _collect_api_aggregate( + config: TelemetryRuntimeConfig, + snapshot: ApiRequestMetricsSnapshot, + window_seconds: int, +) -> TelemetryProperties: + properties = _base_aggregate_properties(config, window_seconds) + properties.update( + { + "api_requests_total": snapshot.request_count, + "api_requests_2xx": snapshot.status_2xx_count, + "api_requests_3xx": snapshot.status_3xx_count, + "api_requests_4xx": snapshot.status_4xx_count, + "api_requests_5xx": snapshot.status_5xx_count, + "api_latency_avg_ms": snapshot.latency_avg_ms, + "api_latency_p95_ms": snapshot.latency_p95_ms, + } + ) + return properties + + +async def _collect_provider_aggregate( + session: AsyncSession, + config: TelemetryRuntimeConfig, + window_seconds: int, +) -> TelemetryProperties: + properties = _base_aggregate_properties(config, window_seconds) + parse_tokens = await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(total_tokens), 0) + FROM parse_runs + WHERE started_at >= {_window_start_expression()} + """, + window_seconds, + ) + retrieval_tokens = await _int_scalar( + session, + f""" + SELECT COALESCE(SUM(token_count), 0) + FROM retrieval_steps + WHERE created_at >= {_window_start_expression()} + """, + window_seconds, + ) + properties.update( + { + "parse_agent_runs_24h": await _int_scalar( + session, + _windowed_count_sql("parse_runs", "started_at"), + window_seconds, + ), + "parse_agent_errors_24h": await _int_scalar( + session, + _windowed_count_sql( + "parse_runs", + "started_at", + "final_status <> 'ready'", + ), + window_seconds, + ), + "parse_agent_tokens_24h": parse_tokens, + "parse_agent_latency_avg_ms_24h": await _float_scalar( + session, + f""" + SELECT COALESCE(AVG(total_latency_ms), 0) + FROM parse_runs + WHERE started_at >= {_window_start_expression()} + """, + window_seconds, + ), + "retrieval_provider_tokens_24h": retrieval_tokens, + "retrieval_model_count_24h": await _int_scalar( + session, + f""" + SELECT COUNT(DISTINCT model_name) + FROM retrieval_steps + WHERE created_at >= {_window_start_expression()} + AND model_name IS NOT NULL + AND model_name <> '' + """, + window_seconds, + ), + "provider_tokens_24h": parse_tokens + retrieval_tokens, + "provider_error_count_24h": await _int_scalar( + session, + f""" + SELECT + (SELECT COUNT(*) FROM parse_runs + WHERE started_at >= {_window_start_expression()} + AND final_status <> 'ready') + + + (SELECT COUNT(*) FROM retrieval_steps + WHERE created_at >= {_window_start_expression()} + AND error IS NOT NULL + AND error <> '') + """, + window_seconds, + ), + "webhook_deliveries_24h": await _int_scalar( + session, + _windowed_count_sql("webhook_logs", "created_at"), + window_seconds, + ), + "webhook_delivery_failures_24h": await _int_scalar( + session, + _windowed_count_sql( + "webhook_logs", + "created_at", + "(response_status_code IS NULL OR response_status_code >= 400)", + ), + window_seconds, + ), + } + ) + return properties + + +def _base_aggregate_properties( + config: TelemetryRuntimeConfig, + window_seconds: int, +) -> TelemetryProperties: + return { + **build_base_event_properties(config), + "window_seconds": window_seconds, + } + + +async def _int_scalar( + session: AsyncSession, + statement: str, + window_seconds: int | None = None, +) -> int: + value = await _scalar(session, statement, window_seconds) + if value is None: + return 0 + return int(cast(Any, value)) + + +async def _float_scalar( + session: AsyncSession, + statement: str, + window_seconds: int | None = None, +) -> float: + value = await _scalar(session, statement, window_seconds) + if value is None: + return 0.0 + return float(cast(Any, value)) + + +async def _scalar( + session: AsyncSession, + statement: str, + window_seconds: int | None = None, +) -> object: + params = {} if window_seconds is None else {"window_seconds": window_seconds} + result = await session.execute(text(statement), params) + return result.scalar_one_or_none() + + +async def _try_aggregate_advisory_lock(session: AsyncSession) -> bool: + result = await session.execute( + text("SELECT pg_try_advisory_lock(:lock_id)"), + {"lock_id": AGGREGATE_ADVISORY_LOCK_ID}, + ) + return bool(result.scalar_one_or_none()) + + +async def _release_aggregate_advisory_lock(session: AsyncSession) -> None: + await session.execute( + text("SELECT pg_advisory_unlock(:lock_id)"), + {"lock_id": AGGREGATE_ADVISORY_LOCK_ID}, + ) + + +def _windowed_count_sql( + table_name: str, + timestamp_column: str, + predicate: str | None = None, +) -> str: + clauses = [f"{timestamp_column} >= {_window_start_expression()}"] + if predicate: + clauses.append(predicate) + return f"SELECT COUNT(*) FROM {table_name} WHERE {' AND '.join(clauses)}" + + +def _window_start_expression() -> str: + return "timezone('utc', now()) - (:window_seconds * INTERVAL '1 second')" diff --git a/packages/shared-python/shared/services/telemetry/api_metrics.py b/packages/shared-python/shared/services/telemetry/api_metrics.py new file mode 100644 index 00000000..d6bbe4eb --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/api_metrics.py @@ -0,0 +1,85 @@ +"""In-process API request buckets for anonymous aggregate telemetry.""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass + + +@dataclass(frozen=True) +class ApiRequestMetricsSnapshot: + """Bounded API request aggregate data since the last snapshot.""" + + request_count: int + status_2xx_count: int + status_3xx_count: int + status_4xx_count: int + status_5xx_count: int + latency_avg_ms: float + latency_p95_ms: float + + +class ApiRequestTelemetryMetrics: + """Process-local request counters flushed as aggregate telemetry.""" + + _MAX_LATENCY_SAMPLES = 1_000 + + def __init__(self) -> None: + self._lock = threading.Lock() + self._request_count = 0 + self._status_2xx_count = 0 + self._status_3xx_count = 0 + self._status_4xx_count = 0 + self._status_5xx_count = 0 + self._latency_total_ms = 0.0 + self._latency_samples_ms: list[float] = [] + + def record(self, *, status_code: int, latency_ms: float) -> None: + """Record one completed HTTP request without storing request content.""" + with self._lock: + self._request_count += 1 + self._latency_total_ms += max(latency_ms, 0.0) + if len(self._latency_samples_ms) < self._MAX_LATENCY_SAMPLES: + self._latency_samples_ms.append(max(latency_ms, 0.0)) + if 200 <= status_code < 300: + self._status_2xx_count += 1 + elif 300 <= status_code < 400: + self._status_3xx_count += 1 + elif 400 <= status_code < 500: + self._status_4xx_count += 1 + elif status_code >= 500: + self._status_5xx_count += 1 + + def snapshot_and_reset(self) -> ApiRequestMetricsSnapshot: + """Return current counters and reset the interval bucket.""" + with self._lock: + request_count = self._request_count + samples = sorted(self._latency_samples_ms) + snapshot = ApiRequestMetricsSnapshot( + request_count=request_count, + status_2xx_count=self._status_2xx_count, + status_3xx_count=self._status_3xx_count, + status_4xx_count=self._status_4xx_count, + status_5xx_count=self._status_5xx_count, + latency_avg_ms=( + self._latency_total_ms / request_count + if request_count > 0 + else 0.0 + ), + latency_p95_ms=_calculate_percentile(samples, 0.95), + ) + self._request_count = 0 + self._status_2xx_count = 0 + self._status_3xx_count = 0 + self._status_4xx_count = 0 + self._status_5xx_count = 0 + self._latency_total_ms = 0.0 + self._latency_samples_ms = [] + return snapshot + + +def _calculate_percentile(values: list[float], percentile: float) -> float: + if not values: + return 0.0 + index = max(0, min(len(values) - 1, int(round((len(values) - 1) * percentile)))) + return values[index] diff --git a/packages/shared-python/shared/services/telemetry/client.py b/packages/shared-python/shared/services/telemetry/client.py index a5c4b0c3..8534d549 100644 --- a/packages/shared-python/shared/services/telemetry/client.py +++ b/packages/shared-python/shared/services/telemetry/client.py @@ -1,21 +1,23 @@ -"""Non-blocking PostHog telemetry client for anonymous self-hosted events.""" +"""Non-blocking PostHog SDK client for anonymous self-hosted events.""" from __future__ import annotations import asyncio +import math from collections.abc import Mapping from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any +from typing import Any, Protocol -import httpx from loguru import logger +from posthog import Posthog from .config import TelemetryRuntimeConfig from .events import ( TelemetryProperties, get_allowed_telemetry_event_names, sanitize_event_properties, + sanitize_posthog_event_properties, ) @@ -28,6 +30,22 @@ class TelemetryEvent: timestamp: datetime +class PostHogTelemetryClient(Protocol): + """Small PostHog SDK surface used by anonymous product telemetry.""" + + def capture(self, event: str, **kwargs: Any) -> str | None: + """Capture one event.""" + raise NotImplementedError + + def flush(self) -> None: + """Flush the SDK queue.""" + raise NotImplementedError + + def shutdown(self) -> None: + """Flush and stop the SDK client.""" + raise NotImplementedError + + class TelemetryClient: """Bounded, asynchronous client for anonymous self-hosted telemetry.""" @@ -35,13 +53,14 @@ def __init__( self, config: TelemetryRuntimeConfig, *, - http_client: httpx.AsyncClient | None = None, + posthog_client: PostHogTelemetryClient | None = None, queue_size: int = 100, ) -> None: self._config = config self._queue: asyncio.Queue[TelemetryEvent] = asyncio.Queue(maxsize=queue_size) - self._http_client = http_client - self._owns_http_client = http_client is None + self._posthog_client = posthog_client + self._owns_posthog_client = posthog_client is None + self._queue_size = queue_size self._worker_task: asyncio.Task[None] | None = None self._is_closed = False self._allowed_event_names = get_allowed_telemetry_event_names() @@ -51,12 +70,8 @@ async def start(self) -> None: if self._is_closed or not self._config.is_ready or self._worker_task is not None: return - if self._http_client is None: - timeout = httpx.Timeout(self._config.request_timeout_seconds) - self._http_client = httpx.AsyncClient( - timeout=timeout, - follow_redirects=True, - ) + if self._posthog_client is None: + self._posthog_client = self._create_posthog_client() self._worker_task = asyncio.create_task(self._run(), name="telemetry-client") def capture( @@ -89,6 +104,7 @@ async def flush(self) -> None: return if self._worker_task is not None: await self._queue.join() + await self._flush_posthog_client() return while not self._queue.empty(): @@ -98,6 +114,7 @@ async def flush(self) -> None: finally: for _ in telemetry_events: self._queue.task_done() + await self._flush_posthog_client() async def stop(self) -> None: """Flush queued telemetry and close the background sender.""" @@ -111,9 +128,9 @@ async def stop(self) -> None: await asyncio.gather(worker_task, return_exceptions=True) self._worker_task = None - if self._owns_http_client and self._http_client is not None: - await self._http_client.aclose() - self._http_client = None + if self._owns_posthog_client and self._posthog_client is not None: + await self._shutdown_posthog_client() + self._posthog_client = None async def _run(self) -> None: while True: @@ -139,33 +156,68 @@ def _drain_batch(self, max_count: int | None = None) -> list[TelemetryEvent]: return telemetry_events async def _send_batch(self, telemetry_events: list[TelemetryEvent]) -> None: - if not telemetry_events or self._http_client is None: + posthog_client = self._posthog_client + if not telemetry_events or posthog_client is None: return - payload = self._build_batch_payload(telemetry_events) try: - response = await self._http_client.post( - self._config.batch_url, - json=payload, - ) - response.raise_for_status() - except Exception as exc: - logger.warning(f"anonymous telemetry send failed: {exc}") - - def _build_batch_payload(self, telemetry_events: list[TelemetryEvent]) -> dict[str, Any]: - return { - "api_key": self._config.posthog_project_key, - "historical_migration": False, - "batch": [ - { - "event": telemetry_event.event_name, - "properties": { + for telemetry_event in telemetry_events: + posthog_client.capture( + telemetry_event.event_name, + distinct_id=self._config.installation_id, + properties={ **telemetry_event.properties, - "distinct_id": self._config.installation_id, "$process_person_profile": False, }, - "timestamp": telemetry_event.timestamp.isoformat(), - } - for telemetry_event in telemetry_events - ], - } + timestamp=telemetry_event.timestamp, + disable_geoip=True, + ) + except Exception as exc: + logger.warning(f"anonymous telemetry send failed: {exc}") + + def _create_posthog_client(self) -> PostHogTelemetryClient: + return Posthog( + self._config.posthog_project_key, + host=self._config.posthog_host, + max_queue_size=self._queue_size, + flush_at=self._config.batch_size, + flush_interval=0.5, + sync_mode=False, + timeout=math.ceil(self._config.request_timeout_seconds), + disable_geoip=True, + enable_exception_autocapture=False, + enable_local_evaluation=False, + before_send=self._sanitize_posthog_message, + ) + + def _sanitize_posthog_message(self, message: dict[str, Any]) -> dict[str, Any] | None: + event_name = message.get("event") + if not isinstance(event_name, str) or event_name not in self._allowed_event_names: + return None + + raw_properties = message.get("properties") + properties = raw_properties if isinstance(raw_properties, Mapping) else {} + sanitized_message = dict(message) + sanitized_message["properties"] = sanitize_posthog_event_properties( + event_name, + properties, + ) + return sanitized_message + + async def _flush_posthog_client(self) -> None: + posthog_client = self._posthog_client + if posthog_client is None: + return + try: + await asyncio.to_thread(posthog_client.flush) + except Exception as exc: + logger.warning(f"anonymous telemetry flush failed: {exc}") + + async def _shutdown_posthog_client(self) -> None: + posthog_client = self._posthog_client + if posthog_client is None: + return + try: + await asyncio.to_thread(posthog_client.shutdown) + except Exception as exc: + logger.warning(f"anonymous telemetry shutdown failed: {exc}") diff --git a/packages/shared-python/shared/services/telemetry/config.py b/packages/shared-python/shared/services/telemetry/config.py index 655d7599..328f2706 100644 --- a/packages/shared-python/shared/services/telemetry/config.py +++ b/packages/shared-python/shared/services/telemetry/config.py @@ -53,11 +53,6 @@ def is_ready(self) -> bool: and bool(self.installation_id) ) - @property - def batch_url(self) -> str: - """Return the PostHog batch ingestion URL.""" - return f"{self.posthog_host.rstrip('/')}/batch/" - def build_telemetry_config( settings: TelemetrySettings, diff --git a/packages/shared-python/shared/services/telemetry/events.py b/packages/shared-python/shared/services/telemetry/events.py index 9a40ef01..287e8ab9 100644 --- a/packages/shared-python/shared/services/telemetry/events.py +++ b/packages/shared-python/shared/services/telemetry/events.py @@ -25,6 +25,27 @@ } ) +_AGGREGATE_PROPERTY_NAMES = frozenset( + { + "window_seconds", + } +) + +_POSTHOG_SDK_PROPERTY_NAMES = frozenset( + { + "$geoip_disable", + "$is_server", + "$lib", + "$lib_version", + "$os", + "$os_distro", + "$os_version", + "$process_person_profile", + "$python_runtime", + "$python_version", + } +) + _EVENT_PROPERTY_NAMES: dict[str, frozenset[str]] = { "self_hosted_instance_started": frozenset(), "self_hosted_instance_heartbeat": frozenset( @@ -36,6 +57,77 @@ } ), "self_hosted_instance_shutdown": frozenset(), + "self_hosted_usage_aggregate": _AGGREGATE_PROPERTY_NAMES + | frozenset( + { + "active_api_keys", + "active_documents", + "active_jobs", + "completed_jobs_24h", + "credits_charged_24h", + "failed_jobs_24h", + "jobs_created_24h", + "pages_processed_24h", + "total_document_chunks", + "total_documents", + "total_job_chunks", + "total_jobs", + "total_users", + } + ), + "self_hosted_retrieval_aggregate": _AGGREGATE_PROPERTY_NAMES + | frozenset( + { + "retrieval_cache_hits_24h", + "retrieval_errors_24h", + "retrieval_latency_avg_ms_24h", + "retrieval_latency_p95_ms_24h", + "retrieval_result_count_24h", + "retrieval_runs_24h", + "retrieval_step_errors_24h", + "retrieval_steps_24h", + "retrieval_tokens_24h", + } + ), + "self_hosted_worker_aggregate": _AGGREGATE_PROPERTY_NAMES + | frozenset( + { + "job_duration_avg_seconds_24h", + "jobs_converting", + "jobs_done_24h", + "jobs_failed_24h", + "jobs_pending", + "jobs_running", + "jobs_waiting_file", + } + ), + "self_hosted_api_aggregate": _AGGREGATE_PROPERTY_NAMES + | frozenset( + { + "api_latency_avg_ms", + "api_latency_p95_ms", + "api_requests_2xx", + "api_requests_3xx", + "api_requests_4xx", + "api_requests_5xx", + "api_requests_total", + } + ), + "self_hosted_provider_aggregate": _AGGREGATE_PROPERTY_NAMES + | frozenset( + { + "parse_agent_errors_24h", + "parse_agent_latency_avg_ms_24h", + "parse_agent_runs_24h", + "parse_agent_tokens_24h", + "provider_error_count_24h", + "provider_tokens_24h", + "retrieval_model_count_24h", + "retrieval_provider_tokens_24h", + "webhook_deliveries_24h", + "webhook_delivery_failures_24h", + } + ), } @@ -77,6 +169,24 @@ def build_instance_event_properties( return properties +def build_base_event_properties(config: TelemetryRuntimeConfig) -> TelemetryProperties: + """Build safe common properties for all self-hosted telemetry events.""" + return { + "app_env": config.app_env, + "app_version": config.app_version, + "api_standalone_mode_enabled": _read_bool_environment( + "API_STANDALONE_MODE_ENABLED", + False, + ), + "billing_enabled": _read_bool_environment("BILLING_ENABLED", False), + "deployment_mode": config.deployment_mode, + "environment": config.environment, + "rate_limit_enabled": _read_bool_environment("RATE_LIMIT_ENABLED", True), + "schema_version": config.schema_version, + "service_name": config.service_name, + } + + def sanitize_event_properties( event_name: str, properties: Mapping[str, object], @@ -95,6 +205,24 @@ def sanitize_event_properties( return sanitized_properties +def sanitize_posthog_event_properties( + event_name: str, + properties: Mapping[str, object], +) -> TelemetryProperties: + """Strip unknown properties after the PostHog SDK adds system metadata.""" + sanitized_properties = sanitize_event_properties(event_name, properties) + for property_name in _POSTHOG_SDK_PROPERTY_NAMES: + if property_name not in properties: + continue + property_value = properties.get(property_name) + if _is_safe_property_value(property_value): + sanitized_properties[property_name] = cast( + TelemetryPropertyValue, + property_value, + ) + return sanitized_properties + + def _is_safe_property_value(value: object) -> bool: return value is None or isinstance(value, (str, int, float, bool)) diff --git a/packages/shared-python/shared/services/telemetry/runtime.py b/packages/shared-python/shared/services/telemetry/runtime.py index fbc88b42..d1bba0ed 100644 --- a/packages/shared-python/shared/services/telemetry/runtime.py +++ b/packages/shared-python/shared/services/telemetry/runtime.py @@ -7,7 +7,7 @@ from loguru import logger from .client import TelemetryClient -from .config import TelemetrySettings, build_telemetry_config +from .config import TelemetryRuntimeConfig, TelemetrySettings, build_telemetry_config from .events import build_instance_event_properties from .identity import get_or_create_installation_id @@ -19,7 +19,7 @@ async def start_self_hosted_telemetry( api_healthy: bool, postgres_healthy: bool, redis_healthy: bool, -) -> TelemetryClient | None: +) -> tuple[TelemetryClient, TelemetryRuntimeConfig] | None: """Start anonymous self-hosted telemetry for the current service.""" if not settings.TELEMETRY_ENABLED: logger.info("anonymous self-hosted telemetry disabled") @@ -67,7 +67,7 @@ async def start_self_hosted_telemetry( ) logger.info("anonymous self-hosted telemetry started") - return telemetry_client + return telemetry_client, config async def stop_self_hosted_telemetry( diff --git a/uv.lock b/uv.lock index 57d0bdcb..93f90694 100644 --- a/uv.lock +++ b/uv.lock @@ -361,6 +361,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/57/2f/55fca558f925a51db046e5b929deb317ddb05afed74b22d89f4eca578980/authlib-1.6.11-py2.py3-none-any.whl", hash = "sha256:c8687a9a26451c51a34a06fa17bb97cb15bba46a6a626755e2d7f50da8bff3e3", size = 244469, upload-time = "2026-04-16T07:22:48.413Z" }, ] +[[package]] +name = "backoff" +version = "2.2.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/47/d7/5bbeb12c44d7c4f2fb5b56abce497eb5ed9f34d85701de869acedd602619/backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba", size = 17001, upload-time = "2022-10-05T19:19:32.061Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/df/73/b6e24bd22e6720ca8ee9a85a0c4a2971af8497d8f3193fa05390cbd46e09/backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8", size = 15148, upload-time = "2022-10-05T19:19:30.546Z" }, +] + [[package]] name = "bcrypt" version = "4.3.0" @@ -1486,6 +1495,7 @@ dependencies = [ { name = "openai" }, { name = "pgvector" }, { name = "pillow" }, + { name = "posthog" }, { name = "psutil" }, { name = "psycogreen" }, { name = "psycopg2-binary" }, @@ -1530,6 +1540,7 @@ requires-dist = [ { name = "openai", specifier = ">=1.0.0" }, { name = "pgvector", specifier = "==0.2.4" }, { name = "pillow", specifier = "==12.2.0" }, + { name = "posthog", specifier = "==7.18.1" }, { name = "psutil", specifier = "==5.9.8" }, { name = "psycogreen", specifier = ">=1.0.2" }, { name = "psycopg2-binary", specifier = "==2.9.12" }, @@ -2693,6 +2704,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/70/2c/b1faca65b9728b4ac43f0bee4bb9e7294bd0a62cc2ee59fd59403bf575f6/port_for-1.0.0-py3-none-any.whl", hash = "sha256:35a848b98cf4cc075fe80dc49ae5c3a78e3ca345a23bd39bf5252277b4eef5c2", size = 17544, upload-time = "2025-09-30T10:22:49.878Z" }, ] +[[package]] +name = "posthog" +version = "7.18.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "backoff" }, + { name = "distro" }, + { name = "requests" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/56/49d06b64baf270f8ec7fdb9352eaf2469167fcf0a1cbd2facc9335ab52fb/posthog-7.18.1.tar.gz", hash = "sha256:a4a3496448aa2bc4e13880daab205d11c13f8a537570662dcf9e5ecef3696ca1", size = 231652, upload-time = "2026-06-10T14:22:28.434Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b9/c5/0db39bdf91ae2e473ba139568ed24d631e87caac6c175f466d06eedc8747/posthog-7.18.1-py3-none-any.whl", hash = "sha256:54797ae8767911dfd83541f69a9e4fda65e100cb77dc0cf093fd98a3b84916a6", size = 270818, upload-time = "2026-06-10T14:22:26.849Z" }, +] + [[package]] name = "pptx2md" version = "2.0.6"