diff --git a/apps/api/app/services/s3_events/subscription_service.py b/apps/api/app/services/s3_events/subscription_service.py index c64ab78e..5947184c 100644 --- a/apps/api/app/services/s3_events/subscription_service.py +++ b/apps/api/app/services/s3_events/subscription_service.py @@ -1,17 +1,33 @@ """SNS subscription confirmation handling.""" from __future__ import annotations +import asyncio +import socket +from collections.abc import Sequence + from loguru import logger +from urllib.parse import ParseResult, parse_qs, urlparse, urlunparse +from shared.core.config import settings from shared.services.http.pinned_outbound import send_pinned_outbound_request -from shared.services.http.url_security import validate_http_url_and_resolve_ip_async +from shared.services.http.url_security import ( + HTTPURLValidationResult, + validate_http_url_and_resolve_ip_async, +) SNS_SUBSCRIPTION_TIMEOUT_SECONDS = 10 +LOCALSTACK_HOSTNAMES = frozenset( + { + "localstack", + "localhost.localstack.cloud", + } +) async def confirm_sns_subscription(subscribe_url: str) -> dict[str, str]: - validation = await validate_http_url_and_resolve_ip_async(subscribe_url) + rewritten_url = _rewrite_localstack_subscribe_url(subscribe_url) + validation = await _validate_sns_confirmation_url(rewritten_url) if not validation.is_valid: logger.warning( f"SNS subscription confirmation URL failed validation: {validation.error_message}" @@ -25,7 +41,7 @@ async def confirm_sns_subscription(subscribe_url: str) -> dict[str, str]: try: response = await send_pinned_outbound_request( method="GET", - url=subscribe_url, + url=rewritten_url, pinned_ip=validation.validated_ip, timeout_seconds=SNS_SUBSCRIPTION_TIMEOUT_SECONDS, ) @@ -45,3 +61,142 @@ async def confirm_sns_subscription(subscribe_url: str) -> dict[str, str]: except Exception as exc: logger.error(f"Failed to reach the SNS confirmation URL: {exc}") return {"message": "SNS subscription confirmation failed"} + + +async def _validate_sns_confirmation_url(url: str) -> HTTPURLValidationResult: + """Validate the SNS confirmation URL, allowing configured LocalStack endpoints.""" + if _is_configured_localstack_confirmation_url(url): + return await _resolve_configured_localstack_confirmation_url(url) + + return await validate_http_url_and_resolve_ip_async(url) + + +async def _resolve_configured_localstack_confirmation_url( + url: str, +) -> HTTPURLValidationResult: + parsed_url = urlparse(url) + hostname = parsed_url.hostname + if not hostname: + return HTTPURLValidationResult( + is_valid=False, + url=url, + error_message="URL must include a hostname", + failure_reason="missing_hostname", + ) + + try: + loop = asyncio.get_running_loop() + address_infos = await loop.getaddrinfo(hostname, None) + except socket.gaierror as exc: + return HTTPURLValidationResult( + is_valid=False, + url=url, + hostname=hostname, + error_message=f"Unable to resolve hostname {hostname}: {exc}", + failure_reason="hostname_resolution_failed", + ) + + validated_ip = _extract_first_resolved_ip(address_infos) + if not validated_ip: + return HTTPURLValidationResult( + is_valid=False, + url=url, + hostname=hostname, + error_message=f"Unable to resolve hostname {hostname}", + failure_reason="hostname_resolution_failed", + ) + + return HTTPURLValidationResult( + is_valid=True, + url=url, + hostname=hostname, + validated_ip=validated_ip, + ) + + +def _rewrite_localstack_subscribe_url(subscribe_url: str) -> str: + parsed_subscribe_url = urlparse(subscribe_url) + storage_endpoint = _parse_configured_storage_endpoint() + if not storage_endpoint: + return subscribe_url + + if not _is_confirm_subscription_action(parsed_subscribe_url): + return subscribe_url + + if not _is_localstack_endpoint(parsed_subscribe_url): + return subscribe_url + + if not _is_localstack_endpoint(storage_endpoint): + return subscribe_url + + return urlunparse( + parsed_subscribe_url._replace( + scheme=storage_endpoint.scheme, + netloc=storage_endpoint.netloc, + ) + ) + + +def _is_configured_localstack_confirmation_url(url: str) -> bool: + parsed_url = urlparse(url) + storage_endpoint = _parse_configured_storage_endpoint() + if not storage_endpoint: + return False + + if not _is_confirm_subscription_action(parsed_url): + return False + + if not _is_localstack_endpoint(parsed_url): + return False + + if not _is_localstack_endpoint(storage_endpoint): + return False + + return _endpoint_origin(parsed_url) == _endpoint_origin(storage_endpoint) + + +def _parse_configured_storage_endpoint() -> ParseResult | None: + endpoint_url = settings.S3_ENDPOINT_URL.strip() + if not endpoint_url: + return None + + parsed_endpoint = urlparse(endpoint_url) + if parsed_endpoint.scheme not in {"http", "https"}: + return None + if not parsed_endpoint.hostname: + return None + return parsed_endpoint + + +def _is_confirm_subscription_action(parsed_url: ParseResult) -> bool: + actions = parse_qs(parsed_url.query).get("Action", []) + return any(action.lower() == "confirmsubscription" for action in actions) + + +def _is_localstack_endpoint(parsed_url: ParseResult) -> bool: + hostname = (parsed_url.hostname or "").rstrip(".").lower() + return hostname in LOCALSTACK_HOSTNAMES + + +def _endpoint_origin(parsed_url: ParseResult) -> tuple[str, str, int | None]: + return ( + parsed_url.scheme.lower(), + (parsed_url.hostname or "").rstrip(".").lower(), + parsed_url.port, + ) + + +def _extract_first_resolved_ip(address_infos: Sequence[object]) -> str | None: + for address_info in address_infos: + if not isinstance(address_info, tuple) or len(address_info) < 5: + continue + family = address_info[0] + socket_address = address_info[4] + if family not in (socket.AF_INET, socket.AF_INET6): + continue + if not isinstance(socket_address, tuple) or not socket_address: + continue + address = socket_address[0] + if isinstance(address, str) and address: + return address + return None diff --git a/apps/api/main.py b/apps/api/main.py index db75099b..68eb223e 100644 --- a/apps/api/main.py +++ b/apps/api/main.py @@ -68,6 +68,16 @@ async def lifespan(app: FastAPI): await load_rules(session) logger.info("rate limit rules loaded at startup; restart the pod to apply changes") + from shared.services.telemetry.runtime import start_self_hosted_telemetry + + app.state.self_hosted_telemetry_client = await start_self_hosted_telemetry( + settings, + service_name="knowhere-api", + api_healthy=True, + postgres_healthy=True, + redis_healthy=True, + ) + mcp_server = getattr(app.state, "retrieval_mcp_server", None) mcp_session_manager = getattr(mcp_server, "session_manager", None) @@ -78,6 +88,15 @@ async def lifespan(app: FastAPI): else: yield + try: + from shared.services.telemetry.runtime import stop_self_hosted_telemetry + + await stop_self_hosted_telemetry( + getattr(app.state, "self_hosted_telemetry_client", None) + ) + except Exception as e: + logger.error(f"self-hosted telemetry shutdown failed: {e}") + try: from shared.services.retrieval.stats.recorder import ( drain_retrieval_hit_stats_updates, diff --git a/apps/api/tests/contract/test_s3_event_contract.py b/apps/api/tests/contract/test_s3_event_contract.py index b1348c0c..d91c3795 100644 --- a/apps/api/tests/contract/test_s3_event_contract.py +++ b/apps/api/tests/contract/test_s3_event_contract.py @@ -3,6 +3,7 @@ import socket from collections.abc import Callable from contextlib import AbstractAsyncContextManager +from types import SimpleNamespace from typing import cast from uuid import uuid4 @@ -278,6 +279,80 @@ def get(self, url: str, *args: object, **kwargs: object) -> object: assert contacted_urls == [] +@pytest.mark.asyncio +async def test_should_confirm_a_configured_localstack_subscription_url_in_self_hosted_runtime( + monkeypatch: MonkeyPatch, +) -> None: + contacted_requests: list[dict[str, str]] = [] + + class FakeOutboundResponse: + status = 200 + + async def send_fake_pinned_outbound_request( + *, + method: str, + url: str, + pinned_ip: str, + timeout_seconds: float, + ) -> FakeOutboundResponse: + contacted_requests.append( + { + "method": method, + "url": url, + "pinned_ip": pinned_ip, + "timeout_seconds": str(timeout_seconds), + } + ) + return FakeOutboundResponse() + + def resolve_localstack_address( + host: str, + port: int | None, + *args: object, + **kwargs: object, + ) -> list[tuple[socket.AddressFamily, socket.SocketKind, int, str, tuple[str, int]]]: + assert host == "localstack" + return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("172.18.0.10", 0))] + + subscription_service = importlib.import_module( + "app.services.s3_events.subscription_service" + ) + + monkeypatch.setattr( + subscription_service, + "settings", + SimpleNamespace(S3_ENDPOINT_URL="http://localstack:4566"), + ) + monkeypatch.setattr(socket, "getaddrinfo", resolve_localstack_address) + monkeypatch.setattr( + subscription_service, + "send_pinned_outbound_request", + send_fake_pinned_outbound_request, + ) + + response = await subscription_service.confirm_sns_subscription( + "http://localhost.localstack.cloud:4566/" + "?Action=ConfirmSubscription" + "&TopicArn=arn:aws:sns:us-west-1:000000000000:test" + "&Token=contract-token" + ) + + assert response == {"message": "SNS subscription confirmed"} + assert contacted_requests == [ + { + "method": "GET", + "url": ( + "http://localstack:4566/" + "?Action=ConfirmSubscription" + "&TopicArn=arn:aws:sns:us-west-1:000000000000:test" + "&Token=contract-token" + ), + "pinned_ip": "172.18.0.10", + "timeout_seconds": "10", + } + ] + + @pytest.mark.asyncio async def test_should_return_ok_for_a_malformed_event_payload_without_triggering_retries( api_client_factory: Callable[[], AbstractAsyncContextManager[AsyncClient]], diff --git a/apps/api/tests/contract/test_self_hosted_telemetry_contract.py b/apps/api/tests/contract/test_self_hosted_telemetry_contract.py new file mode 100644 index 00000000..0c2e5630 --- /dev/null +++ b/apps/api/tests/contract/test_self_hosted_telemetry_contract.py @@ -0,0 +1,240 @@ +"""Contract tests for anonymous self-hosted telemetry.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, replace +from pathlib import Path + +import httpx +import pytest + +from shared.services.telemetry.client import TelemetryClient +from shared.services.telemetry.config import TelemetryRuntimeConfig +from shared.services.telemetry.events import sanitize_event_properties +from shared.services.telemetry.identity import get_or_create_installation_id + + +def test_installation_id_is_generated_once(tmp_path: Path) -> None: + installation_id_path = tmp_path / "telemetry-installation-id" + + first_installation_id = get_or_create_installation_id( + explicit_installation_id="", + installation_id_path=installation_id_path, + ) + second_installation_id = get_or_create_installation_id( + explicit_installation_id="", + installation_id_path=installation_id_path, + ) + + assert first_installation_id == second_installation_id + assert installation_id_path.read_text(encoding="utf-8").strip() == ( + first_installation_id + ) + + +def test_explicit_installation_id_does_not_write_file(tmp_path: Path) -> None: + installation_id_path = tmp_path / "telemetry-installation-id" + explicit_installation_id = "550e8400-e29b-41d4-a716-446655440000" + + installation_id = get_or_create_installation_id( + explicit_installation_id=explicit_installation_id, + installation_id_path=installation_id_path, + ) + + assert installation_id == explicit_installation_id + assert not installation_id_path.exists() + + +def test_explicit_installation_id_must_be_uuid(tmp_path: Path) -> None: + installation_id_path = tmp_path / "telemetry-installation-id" + + with pytest.raises(ValueError, match="must be a UUID"): + get_or_create_installation_id( + explicit_installation_id="customer@example.com", + installation_id_path=installation_id_path, + ) + + +def test_telemetry_properties_strip_unknown_and_non_scalar_values() -> None: + properties = sanitize_event_properties( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.3", + "api_healthy": True, + "email": "user@example.com", + "prompt": "private prompt", + "nested": {"unsafe": True}, + "document_id": "doc_123", + }, + ) + + assert properties == { + "app_version": "1.2.3", + "api_healthy": True, + } + + +@pytest.mark.asyncio +async def test_telemetry_client_sends_anonymous_posthog_batch( + tmp_path: Path, +) -> None: + sent_requests: list[str] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + sent_requests.append(request.read().decode("utf-8")) + return httpx.Response(status_code=200, json={"status": "ok"}) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient(transport=transport) + telemetry_client = TelemetryClient( + _build_config(tmp_path), + http_client=http_client, + ) + + await telemetry_client.start() + queued = telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.3", + "api_healthy": True, + "prompt": "private prompt", + }, + ) + await telemetry_client.stop() + + assert queued is True + assert len(sent_requests) == 1 + assert "phc_test_project_key" in str(sent_requests[0]) + assert "550e8400-e29b-41d4-a716-446655440000" in str(sent_requests[0]) + assert "$process_person_profile" in str(sent_requests[0]) + assert "private prompt" not in str(sent_requests[0]) + + await http_client.aclose() + + +@pytest.mark.asyncio +async def test_telemetry_client_respects_batch_size(tmp_path: Path) -> None: + sent_requests: list[str] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + sent_requests.append(request.read().decode("utf-8")) + return httpx.Response(status_code=200, json={"status": "ok"}) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient(transport=transport) + config = _build_config(tmp_path) + telemetry_client = TelemetryClient( + replace(config, batch_size=2), + http_client=http_client, + ) + + await telemetry_client.start() + for index in range(3): + telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": f"1.2.{index}", + }, + ) + await telemetry_client.stop() + + assert len(sent_requests) == 2 + + await http_client.aclose() + + +@pytest.mark.asyncio +async def test_telemetry_client_flush_before_start_does_not_deadlock( + tmp_path: Path, +) -> None: + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(status_code=200, json={"status": "ok"}) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient(transport=transport) + telemetry_client = TelemetryClient( + _build_config(tmp_path), + http_client=http_client, + ) + + telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.3", + }, + ) + await telemetry_client.flush() + await telemetry_client.start() + telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.4", + }, + ) + + await asyncio.wait_for(telemetry_client.stop(), timeout=1.0) + await http_client.aclose() + + +@pytest.mark.asyncio +async def test_telemetry_client_does_not_restart_after_stop(tmp_path: Path) -> None: + sent_requests: list[str] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + sent_requests.append(request.read().decode("utf-8")) + return httpx.Response(status_code=200, json={"status": "ok"}) + + transport = httpx.MockTransport(handler) + http_client = httpx.AsyncClient(transport=transport) + telemetry_client = TelemetryClient( + _build_config(tmp_path), + http_client=http_client, + ) + + await telemetry_client.start() + telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.3", + }, + ) + await telemetry_client.stop() + await telemetry_client.start() + queued_after_stop = telemetry_client.capture( + "self_hosted_instance_heartbeat", + { + "app_version": "1.2.4", + }, + ) + + assert len(sent_requests) == 1 + assert queued_after_stop is False + + await http_client.aclose() + + +@dataclass(frozen=True) +class _ConfigOverrides: + installation_id: str = "550e8400-e29b-41d4-a716-446655440000" + posthog_project_key: str = "phc_test_project_key" + + +def _build_config( + tmp_path: Path, + overrides: _ConfigOverrides = _ConfigOverrides(), +) -> TelemetryRuntimeConfig: + return TelemetryRuntimeConfig( + enabled=True, + posthog_host="https://us.i.posthog.com", + posthog_project_key=overrides.posthog_project_key, + installation_id=overrides.installation_id, + installation_id_path=tmp_path / "telemetry-installation-id", + batch_size=20, + request_timeout_seconds=2.0, + deployment_mode="self_hosted", + app_version="1.2.3", + environment="production", + app_env="production", + service_name="knowhere-api", + ) diff --git a/packages/shared-python/shared/core/config/base.py b/packages/shared-python/shared/core/config/base.py index d0f865ef..8c0e9571 100644 --- a/packages/shared-python/shared/core/config/base.py +++ b/packages/shared-python/shared/core/config/base.py @@ -33,6 +33,38 @@ class BaseConfig(BaseSettings): LOGFIRE_TOKEN: str = Field( default="", description="Logfire API token for distributed tracing" ) + TELEMETRY_ENABLED: bool = Field( + default=False, + description="Enable anonymous product telemetry for self-hosted deployments", + ) + TELEMETRY_POSTHOG_HOST: str = Field( + default="https://us.i.posthog.com", + description="PostHog ingestion host for anonymous telemetry events", + ) + TELEMETRY_POSTHOG_PROJECT_KEY: str = Field( + default_factory=lambda: os.getenv("NEXT_PUBLIC_POSTHOG_KEY", ""), + description="PostHog project token for anonymous telemetry events", + ) + TELEMETRY_INSTALLATION_ID: str = Field( + default="", + description="Optional operator-provided anonymous self-hosted installation id", + ) + TELEMETRY_INSTALLATION_ID_PATH: str = Field( + default="/data/secrets/telemetry-installation-id", + description="Persistent file used to store the generated telemetry installation id", + ) + TELEMETRY_BATCH_SIZE: int = Field( + default=20, + description="Maximum anonymous telemetry events per PostHog batch request", + ) + TELEMETRY_REQUEST_TIMEOUT_SECONDS: float = Field( + default=2.0, + description="Short timeout for outbound anonymous telemetry requests", + ) + TELEMETRY_DEPLOYMENT_MODE: str = Field( + default="self_hosted", + description="Deployment-mode label attached to anonymous telemetry events", + ) # Security configuration. WEBHOOK_MASTER_KEY: str = Field( @@ -77,6 +109,28 @@ def validate_log_level(cls, v): raise ValueError(f"LOG_LEVEL must be one of {valid_levels}") return v.upper() + @field_validator("TELEMETRY_POSTHOG_HOST") + @classmethod + def validate_telemetry_posthog_host(cls, v): + """Normalize the PostHog ingestion host.""" + return v.strip().rstrip("/") + + @field_validator("TELEMETRY_BATCH_SIZE") + @classmethod + def validate_telemetry_batch_size(cls, v): + """Validate anonymous telemetry batch size.""" + if v < 1: + raise ValueError("TELEMETRY_BATCH_SIZE must be at least 1") + return v + + @field_validator("TELEMETRY_REQUEST_TIMEOUT_SECONDS") + @classmethod + def validate_telemetry_request_timeout_seconds(cls, v): + """Validate anonymous telemetry request timeout.""" + if v <= 0: + raise ValueError("TELEMETRY_REQUEST_TIMEOUT_SECONDS must be positive") + return v + def validate_file_paths(self) -> bool: """Validate required local file paths.""" paths_to_check = { diff --git a/packages/shared-python/shared/services/telemetry/__init__.py b/packages/shared-python/shared/services/telemetry/__init__.py new file mode 100644 index 00000000..047ce17d --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/__init__.py @@ -0,0 +1,18 @@ +"""Anonymous self-hosted telemetry helpers.""" + +from .client import TelemetryClient +from .config import TelemetryRuntimeConfig, build_telemetry_config +from .events import ( + build_instance_event_properties, + get_allowed_telemetry_event_names, +) +from .identity import get_or_create_installation_id + +__all__ = [ + "TelemetryClient", + "TelemetryRuntimeConfig", + "build_telemetry_config", + "build_instance_event_properties", + "get_allowed_telemetry_event_names", + "get_or_create_installation_id", +] diff --git a/packages/shared-python/shared/services/telemetry/client.py b/packages/shared-python/shared/services/telemetry/client.py new file mode 100644 index 00000000..a5c4b0c3 --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/client.py @@ -0,0 +1,171 @@ +"""Non-blocking PostHog telemetry client for anonymous self-hosted events.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Mapping +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + +import httpx +from loguru import logger + +from .config import TelemetryRuntimeConfig +from .events import ( + TelemetryProperties, + get_allowed_telemetry_event_names, + sanitize_event_properties, +) + + +@dataclass(frozen=True) +class TelemetryEvent: + """Queued anonymous telemetry event.""" + + event_name: str + properties: TelemetryProperties + timestamp: datetime + + +class TelemetryClient: + """Bounded, asynchronous client for anonymous self-hosted telemetry.""" + + def __init__( + self, + config: TelemetryRuntimeConfig, + *, + http_client: httpx.AsyncClient | None = None, + queue_size: int = 100, + ) -> None: + self._config = config + self._queue: asyncio.Queue[TelemetryEvent] = asyncio.Queue(maxsize=queue_size) + self._http_client = http_client + self._owns_http_client = http_client is None + self._worker_task: asyncio.Task[None] | None = None + self._is_closed = False + self._allowed_event_names = get_allowed_telemetry_event_names() + + async def start(self) -> None: + """Start the background sender if telemetry is configured.""" + if self._is_closed or not self._config.is_ready or self._worker_task is not None: + return + + if self._http_client is None: + timeout = httpx.Timeout(self._config.request_timeout_seconds) + self._http_client = httpx.AsyncClient( + timeout=timeout, + follow_redirects=True, + ) + self._worker_task = asyncio.create_task(self._run(), name="telemetry-client") + + def capture( + self, + event_name: str, + properties: Mapping[str, object], + ) -> bool: + """Queue an anonymous telemetry event without blocking callers.""" + if self._is_closed or not self._config.is_ready: + return False + if event_name not in self._allowed_event_names: + logger.warning(f"anonymous telemetry event rejected: {event_name}") + return False + + telemetry_event = TelemetryEvent( + event_name=event_name, + properties=sanitize_event_properties(event_name, properties), + timestamp=datetime.now(timezone.utc), + ) + try: + self._queue.put_nowait(telemetry_event) + except asyncio.QueueFull: + logger.warning("anonymous telemetry queue full; dropping event") + return False + return True + + async def flush(self) -> None: + """Wait until queued telemetry has either sent or been dropped.""" + if not self._config.is_ready: + return + if self._worker_task is not None: + await self._queue.join() + return + + while not self._queue.empty(): + telemetry_events = self._drain_batch() + try: + await self._send_batch(telemetry_events) + finally: + for _ in telemetry_events: + self._queue.task_done() + + async def stop(self) -> None: + """Flush queued telemetry and close the background sender.""" + self._is_closed = True + if self._config.is_ready: + await self.flush() + + worker_task = self._worker_task + if worker_task is not None: + worker_task.cancel() + await asyncio.gather(worker_task, return_exceptions=True) + self._worker_task = None + + if self._owns_http_client and self._http_client is not None: + await self._http_client.aclose() + self._http_client = None + + async def _run(self) -> None: + while True: + telemetry_event = await self._queue.get() + telemetry_events = [telemetry_event] + telemetry_events.extend( + self._drain_batch(self._config.batch_size - len(telemetry_events)) + ) + try: + await self._send_batch(telemetry_events) + finally: + for _ in telemetry_events: + self._queue.task_done() + + def _drain_batch(self, max_count: int | None = None) -> list[TelemetryEvent]: + telemetry_events: list[TelemetryEvent] = [] + batch_size = self._config.batch_size if max_count is None else max_count + while len(telemetry_events) < batch_size: + try: + telemetry_events.append(self._queue.get_nowait()) + except asyncio.QueueEmpty: + break + return telemetry_events + + async def _send_batch(self, telemetry_events: list[TelemetryEvent]) -> None: + if not telemetry_events or self._http_client is None: + return + + payload = self._build_batch_payload(telemetry_events) + try: + response = await self._http_client.post( + self._config.batch_url, + json=payload, + ) + response.raise_for_status() + except Exception as exc: + logger.warning(f"anonymous telemetry send failed: {exc}") + + def _build_batch_payload(self, telemetry_events: list[TelemetryEvent]) -> dict[str, Any]: + return { + "api_key": self._config.posthog_project_key, + "historical_migration": False, + "batch": [ + { + "event": telemetry_event.event_name, + "properties": { + **telemetry_event.properties, + "distinct_id": self._config.installation_id, + "$process_person_profile": False, + }, + "timestamp": telemetry_event.timestamp.isoformat(), + } + for telemetry_event in telemetry_events + ], + } diff --git a/packages/shared-python/shared/services/telemetry/config.py b/packages/shared-python/shared/services/telemetry/config.py new file mode 100644 index 00000000..655d7599 --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/config.py @@ -0,0 +1,82 @@ +"""Runtime configuration for anonymous self-hosted telemetry.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + + +class TelemetrySettings(Protocol): + """Subset of app settings required by anonymous telemetry.""" + + TELEMETRY_ENABLED: bool + TELEMETRY_POSTHOG_HOST: str + TELEMETRY_POSTHOG_PROJECT_KEY: str + TELEMETRY_INSTALLATION_ID: str + TELEMETRY_INSTALLATION_ID_PATH: str + TELEMETRY_BATCH_SIZE: int + TELEMETRY_REQUEST_TIMEOUT_SECONDS: float + TELEMETRY_DEPLOYMENT_MODE: str + APP_VERSION: str + ENVIRONMENT: str + APP_ENV: str + API_STANDALONE_MODE_ENABLED: bool + BILLING_ENABLED: bool + + +@dataclass(frozen=True) +class TelemetryRuntimeConfig: + """Resolved anonymous telemetry configuration.""" + + enabled: bool + posthog_host: str + posthog_project_key: str + installation_id: str + installation_id_path: Path + batch_size: int + request_timeout_seconds: float + deployment_mode: str + app_version: str + environment: str + app_env: str + service_name: str + schema_version: str = "2026-06-telemetry-v1" + + @property + def is_ready(self) -> bool: + """Return whether outbound telemetry has enough config to send.""" + return ( + self.enabled + and bool(self.posthog_host) + and bool(self.posthog_project_key) + and bool(self.installation_id) + ) + + @property + def batch_url(self) -> str: + """Return the PostHog batch ingestion URL.""" + return f"{self.posthog_host.rstrip('/')}/batch/" + + +def build_telemetry_config( + settings: TelemetrySettings, + *, + service_name: str, + installation_id: str, +) -> TelemetryRuntimeConfig: + """Build resolved anonymous telemetry config from app settings.""" + return TelemetryRuntimeConfig( + enabled=settings.TELEMETRY_ENABLED, + posthog_host=settings.TELEMETRY_POSTHOG_HOST, + posthog_project_key=settings.TELEMETRY_POSTHOG_PROJECT_KEY.strip(), + installation_id=installation_id.strip(), + installation_id_path=Path(settings.TELEMETRY_INSTALLATION_ID_PATH), + batch_size=settings.TELEMETRY_BATCH_SIZE, + request_timeout_seconds=settings.TELEMETRY_REQUEST_TIMEOUT_SECONDS, + deployment_mode=settings.TELEMETRY_DEPLOYMENT_MODE, + app_version=settings.APP_VERSION, + environment=settings.ENVIRONMENT, + app_env=settings.APP_ENV, + service_name=service_name, + ) diff --git a/packages/shared-python/shared/services/telemetry/events.py b/packages/shared-python/shared/services/telemetry/events.py new file mode 100644 index 00000000..9a40ef01 --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/events.py @@ -0,0 +1,106 @@ +"""Anonymous telemetry event schema and safe property handling.""" + +from __future__ import annotations + +import os +from collections.abc import Mapping +from typing import TypeAlias, cast + +from .config import TelemetryRuntimeConfig + +TelemetryPropertyValue: TypeAlias = str | int | float | bool | None +TelemetryProperties: TypeAlias = dict[str, TelemetryPropertyValue] + +_BASE_PROPERTY_NAMES = frozenset( + { + "app_env", + "app_version", + "api_standalone_mode_enabled", + "billing_enabled", + "deployment_mode", + "environment", + "rate_limit_enabled", + "schema_version", + "service_name", + } +) + +_EVENT_PROPERTY_NAMES: dict[str, frozenset[str]] = { + "self_hosted_instance_started": frozenset(), + "self_hosted_instance_heartbeat": frozenset( + { + "api_healthy", + "postgres_healthy", + "redis_healthy", + "uptime_bucket", + } + ), + "self_hosted_instance_shutdown": frozenset(), +} + + +def get_allowed_telemetry_event_names() -> frozenset[str]: + """Return event names that may be emitted by the telemetry client.""" + return frozenset(_EVENT_PROPERTY_NAMES.keys()) + + +def build_instance_event_properties( + config: TelemetryRuntimeConfig, + *, + api_standalone_mode_enabled: bool, + billing_enabled: bool, + api_healthy: bool | None = None, + postgres_healthy: bool | None = None, + redis_healthy: bool | None = None, + uptime_bucket: str | None = None, +) -> TelemetryProperties: + """Build safe common properties for self-hosted instance events.""" + properties: TelemetryProperties = { + "app_env": config.app_env, + "app_version": config.app_version, + "api_standalone_mode_enabled": api_standalone_mode_enabled, + "billing_enabled": billing_enabled, + "deployment_mode": config.deployment_mode, + "environment": config.environment, + "rate_limit_enabled": _read_bool_environment("RATE_LIMIT_ENABLED", True), + "schema_version": config.schema_version, + "service_name": config.service_name, + } + if api_healthy is not None: + properties["api_healthy"] = api_healthy + if postgres_healthy is not None: + properties["postgres_healthy"] = postgres_healthy + if redis_healthy is not None: + properties["redis_healthy"] = redis_healthy + if uptime_bucket is not None: + properties["uptime_bucket"] = uptime_bucket + return properties + + +def sanitize_event_properties( + event_name: str, + properties: Mapping[str, object], +) -> TelemetryProperties: + """Strip unknown or non-scalar properties before outbound telemetry.""" + allowed_property_names = _BASE_PROPERTY_NAMES | _EVENT_PROPERTY_NAMES[event_name] + sanitized_properties: TelemetryProperties = {} + for property_name, property_value in properties.items(): + if property_name not in allowed_property_names: + continue + if _is_safe_property_value(property_value): + sanitized_properties[property_name] = cast( + TelemetryPropertyValue, + property_value, + ) + return sanitized_properties + + +def _is_safe_property_value(value: object) -> bool: + return value is None or isinstance(value, (str, int, float, bool)) + + +def _read_bool_environment(name: str, default: bool) -> bool: + raw_value = os.getenv(name) + if raw_value is None: + return default + return raw_value.strip().lower() in {"1", "true", "yes", "on"} diff --git a/packages/shared-python/shared/services/telemetry/identity.py b/packages/shared-python/shared/services/telemetry/identity.py new file mode 100644 index 00000000..8407e493 --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/identity.py @@ -0,0 +1,90 @@ +"""Stable anonymous installation identity for self-hosted telemetry.""" + +from __future__ import annotations + +import fcntl +import os +from pathlib import Path +from uuid import UUID, uuid4 + + +def get_or_create_installation_id( + *, + explicit_installation_id: str, + installation_id_path: Path, +) -> str: + """Resolve the stable anonymous self-hosted installation id. + + Operator-provided ids win. Otherwise, the id is generated once and stored in + the persistent self-hosted secrets volume so restarts keep the same id. + """ + normalized_explicit_id = explicit_installation_id.strip() + if normalized_explicit_id: + _parse_uuid(normalized_explicit_id) + return normalized_explicit_id + + installation_id_path.parent.mkdir(parents=True, exist_ok=True) + lock_path = installation_id_path.with_suffix(f"{installation_id_path.suffix}.lock") + + with lock_path.open("a+", encoding="utf-8") as lock_file: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + try: + existing_installation_id = _read_valid_installation_id( + installation_id_path + ) + if existing_installation_id: + return existing_installation_id + + generated_installation_id = str(uuid4()) + _write_installation_id_atomically( + installation_id_path, + generated_installation_id, + ) + return generated_installation_id + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + + +def _read_valid_installation_id(installation_id_path: Path) -> str: + if not installation_id_path.exists(): + return "" + + candidate = installation_id_path.read_text(encoding="utf-8").strip() + if not candidate: + return "" + + try: + _parse_uuid(candidate) + except ValueError: + return "" + return candidate + + +def _parse_uuid(candidate: str) -> UUID: + try: + return UUID(candidate) + except ValueError as exc: + raise ValueError("TELEMETRY_INSTALLATION_ID must be a UUID") from exc + + +def _write_installation_id_atomically( + installation_id_path: Path, + installation_id: str, +) -> None: + temporary_path = installation_id_path.with_name( + f".{installation_id_path.name}.{os.getpid()}.tmp" + ) + file_descriptor = os.open( + temporary_path, + os.O_WRONLY | os.O_CREAT | os.O_EXCL, + 0o600, + ) + try: + with os.fdopen(file_descriptor, "w", encoding="utf-8") as temporary_file: + temporary_file.write(f"{installation_id}\n") + temporary_file.flush() + os.fsync(temporary_file.fileno()) + os.replace(temporary_path, installation_id_path) + finally: + if temporary_path.exists(): + temporary_path.unlink() diff --git a/packages/shared-python/shared/services/telemetry/runtime.py b/packages/shared-python/shared/services/telemetry/runtime.py new file mode 100644 index 00000000..fbc88b42 --- /dev/null +++ b/packages/shared-python/shared/services/telemetry/runtime.py @@ -0,0 +1,80 @@ +"""Runtime lifecycle helpers for anonymous self-hosted telemetry.""" + +from __future__ import annotations + +from pathlib import Path + +from loguru import logger + +from .client import TelemetryClient +from .config import TelemetrySettings, build_telemetry_config +from .events import build_instance_event_properties +from .identity import get_or_create_installation_id + + +async def start_self_hosted_telemetry( + settings: TelemetrySettings, + *, + service_name: str, + api_healthy: bool, + postgres_healthy: bool, + redis_healthy: bool, +) -> TelemetryClient | None: + """Start anonymous self-hosted telemetry for the current service.""" + if not settings.TELEMETRY_ENABLED: + logger.info("anonymous self-hosted telemetry disabled") + return None + + try: + installation_id = get_or_create_installation_id( + explicit_installation_id=settings.TELEMETRY_INSTALLATION_ID, + installation_id_path=Path(settings.TELEMETRY_INSTALLATION_ID_PATH), + ) + except Exception as exc: + logger.warning(f"anonymous self-hosted telemetry identity unavailable: {exc}") + return None + config = build_telemetry_config( + settings, + service_name=service_name, + installation_id=installation_id, + ) + if not config.is_ready: + logger.warning( + "anonymous self-hosted telemetry enabled but PostHog config is incomplete" + ) + return None + + telemetry_client = TelemetryClient(config) + await telemetry_client.start() + + base_properties = build_instance_event_properties( + config, + api_standalone_mode_enabled=settings.API_STANDALONE_MODE_ENABLED, + billing_enabled=settings.BILLING_ENABLED, + ) + telemetry_client.capture("self_hosted_instance_started", base_properties) + telemetry_client.capture( + "self_hosted_instance_heartbeat", + build_instance_event_properties( + config, + api_standalone_mode_enabled=settings.API_STANDALONE_MODE_ENABLED, + billing_enabled=settings.BILLING_ENABLED, + api_healthy=api_healthy, + postgres_healthy=postgres_healthy, + redis_healthy=redis_healthy, + uptime_bucket="0m-5m", + ), + ) + + logger.info("anonymous self-hosted telemetry started") + return telemetry_client + + +async def stop_self_hosted_telemetry( + telemetry_client: TelemetryClient | None, +) -> None: + """Flush and stop anonymous self-hosted telemetry.""" + if telemetry_client is None: + return + telemetry_client.capture("self_hosted_instance_shutdown", {}) + await telemetry_client.stop()