diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index bfd8267fc..6ce72d27a 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -95,6 +95,7 @@ from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache from langfuse.api import ( + AsyncLangfuseAPI, CreateChatPromptRequest, CreateChatPromptType, CreateTextPromptRequest, @@ -105,6 +106,7 @@ DatasetStatus, DeleteDatasetRunResponse, Error, + LangfuseAPI, MapValue, NotFoundError, PaginatedDatasetRuns, @@ -176,6 +178,13 @@ class Langfuse: host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. + **Fork safety**: ``httpx.Client`` is thread-safe but not process-safe. When using + ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK automatically + recreates its internally-managed HTTP client in child processes after fork. A custom + ``httpx_client`` is intentionally left as-is (the fork-inherited copy is reused), so + you retain the opportunity to handle process-safety yourself — for example by + registering your own ``os.register_at_fork(after_in_child=...)`` handler to close and + reopen connections on the custom client. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. @@ -409,8 +418,34 @@ def __init__( if self._tracing_enabled and self._resources.tracer is not None else otel_trace_api.NoOpTracer() ) - self.api = self._resources.api - self.async_api = self._resources.async_api + + @property + def api(self) -> LangfuseAPI: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + return self._resources.api + + @api.setter + def api(self, value: LangfuseAPI) -> None: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + self._resources.api = value + + @property + def async_api(self) -> AsyncLangfuseAPI: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + return self._resources.async_api + + @async_api.setter + def async_api(self, value: AsyncLangfuseAPI) -> None: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + self._resources.async_api = value @overload def start_observation( diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 14e746b86..67c44920a 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -16,7 +16,10 @@ import atexit import os +import sys import threading +import urllib.request +import weakref from queue import Full, Queue from typing import Any, Callable, Dict, List, Optional, cast @@ -79,6 +82,10 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + _otel_tracer: Tracer + _media_manager: MediaManager + _media_upload_consumers: List[MediaUploadConsumer] + _ingestion_consumers: List[ScoreIngestionConsumer] @classmethod def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: @@ -201,6 +208,7 @@ def _initialize_instance( self.mask = mask self.mask_otel_spans = mask_otel_spans self.environment = environment + self._shutdown = False # Store additional client settings for get_client() to use self.timeout = timeout @@ -216,60 +224,19 @@ def _initialize_instance( self.span_exporter = span_exporter self.tracer_provider: Optional[TracerProvider] = None - # API Clients - - ## API clients must be singletons because the underlying HTTPX clients - ## use connection pools with limited capacity. Creating multiple instances - ## could exhaust the OS's maximum number of available TCP sockets (file descriptors), - ## leading to connection errors. - if httpx_client is not None: - self.httpx_client = httpx_client - else: - # Create a new httpx client with additional_headers if provided - client_headers = additional_headers if additional_headers else {} - self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers) - - self.api = LangfuseAPI( - base_url=base_url, - username=self.public_key, - password=secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=timeout, - ) - self.async_api = AsyncLangfuseAPI( - base_url=base_url, - username=self.public_key, - password=secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - timeout=timeout, - ) - score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=secret_key, - base_url=base_url, - version=langfuse_version, - timeout=timeout or 20, - session=self.httpx_client, - ) + self._custom_httpx_client = httpx_client + self._init_api_clients() # Media self._media_upload_enabled = os.environ.get( LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" ).lower() not in ("false", "0") - self._media_upload_queue: Queue[Any] = Queue(100_000) - self._media_manager = MediaManager( - api_client=self.api, - httpx_client=self.httpx_client, - media_upload_queue=self._media_upload_queue, - max_retries=3, + self._media_upload_thread_count = media_upload_thread_count or max( + int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 ) - self._media_upload_consumers = [] + + self._init_media_manager() # OTEL Tracer if tracing_enabled: @@ -303,12 +270,112 @@ def _initialize_instance( attributes={"public_key": self.public_key}, ) - media_upload_thread_count = media_upload_thread_count or max( - int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 + self._init_consumer_threads() + + # Prompt cache + self.prompt_cache = PromptCache() + + # Register shutdown handler + atexit.register(self.shutdown) + + # Register fork handler to reinitialize consumer threads in child process. + # When using Gunicorn with --preload, os.fork() copies memory but not threads + # (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html). + # Without this, media upload and score ingestion threads are lost after fork, + # causing silent data loss. + # + # Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety + # for span export via its own os.register_at_fork. This handler covers the + # remaining background threads managed by LangfuseResourceManager. + # + # weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong + # reference to this instance, which would block garbage collection. + # See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + # Walrus operator resolves the weak reference once and stores it in + # a temporary variable before calling it. This avoids a TOCTOU window + # where GC could collect the referent between checking for None and + # invoking the method. + after_in_child=lambda: (m := weak_reinit()) and m() + ) + + langfuse_logger.info( + f"Startup: Langfuse tracer successfully initialized | " + f"public_key={self.public_key} | " + f"base_url={base_url} | " + f"environment={environment or 'default'} | " + f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " + f"media_threads={self._media_upload_thread_count}" + ) + + def _init_media_manager(self) -> None: + """Initialize or reset media upload state while preserving manager references.""" + self._media_upload_queue: Queue[Any] = Queue(100_000) + if hasattr(self, "_media_manager"): + self._media_manager.reinitialize( + api_client=self.api, + httpx_client=self.httpx_client, + media_upload_queue=self._media_upload_queue, + ) + else: + self._media_manager = MediaManager( + api_client=self.api, + httpx_client=self.httpx_client, + media_upload_queue=self._media_upload_queue, + max_retries=3, + ) + + self._media_upload_consumers = [] + + def _init_api_clients(self) -> None: + """Initialize HTTP-backed API clients. + + Internally-managed httpx clients are recreated when this method is + called after fork. Caller-provided clients are preserved because their + lifecycle belongs to the caller. + """ + if self._custom_httpx_client is not None: + self.httpx_client = self._custom_httpx_client + else: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self.async_api = AsyncLangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, ) + def _init_consumer_threads(self) -> None: + """Initialize media upload and score ingestion consumer threads.""" if self._media_upload_enabled: - for i in range(media_upload_thread_count): + for i in range(self._media_upload_thread_count): media_upload_consumer = MediaUploadConsumer( identifier=i, media_manager=self._media_manager, @@ -316,9 +383,6 @@ def _initialize_instance( media_upload_consumer.start() self._media_upload_consumers.append(media_upload_consumer) - # Prompt cache - self.prompt_cache = PromptCache() - # Score ingestion self._score_ingestion_queue: Queue[Any] = Queue(100_000) self._ingestion_consumers = [] @@ -326,25 +390,87 @@ def _initialize_instance( ingestion_consumer = ScoreIngestionConsumer( ingestion_queue=self._score_ingestion_queue, identifier=0, - client=score_ingestion_client, - flush_at=flush_at, - flush_interval=flush_interval, + client=self._score_ingestion_client, + flush_at=self.flush_at, + flush_interval=self.flush_interval, max_retries=3, public_key=self.public_key, ) ingestion_consumer.start() self._ingestion_consumers.append(ingestion_consumer) - # Register shutdown handler - atexit.register(self.shutdown) + def _at_fork_reinit(self) -> None: + """Reinitialize consumer threads after fork in child process. - langfuse_logger.info( - f"Startup: Langfuse tracer successfully initialized | " - f"public_key={self.public_key} | " - f"base_url={base_url} | " - f"environment={environment or 'default'} | " - f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " - f"media_threads={media_upload_thread_count or 1}" + Called automatically via os.register_at_fork() after fork(). + Necessary for Gunicorn --preload deployments where os.fork() is used: + threads are not copied to child processes (POSIX standard), so without + reinitialization, the child process has no consumer threads and all + media upload and score ingestion events are silently lost. + + Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export + fork-safety separately via its own os.register_at_fork handler. + + Skipped if shutdown() was already called on this instance, to avoid + restarting threads on an intentionally torn-down manager. + """ + # The class-level lock may have been held by a thread in the parent at fork time. + # That thread does not exist in the child, so the lock can never be released and + # any attempt to acquire it would deadlock. Replace it before the shutdown check: + # the lock is class-level state needed by the child (e.g. to create a new client) + # even if this particular instance was already shut down. + LangfuseResourceManager._lock = threading.RLock() + + if self._shutdown: + return + + if sys.platform == "darwin" and not urllib.request.getproxies_environment(): + # urllib proxy discovery falls back to macOS SystemConfiguration APIs that + # are not safe to invoke after fork(). Setting no_proxy="*" makes httpx and + # requests skip that lookup entirely in this child process. Skipped when + # proxies are configured via environment variables: urllib then never touches + # SystemConfiguration (no segfault risk), and overriding no_proxy would + # disable the user's proxy setup process-wide. + os.environ["no_proxy"] = "*" + os.environ["NO_PROXY"] = "*" + + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." + ) + + # Queues are intentionally recreated after fork. Items enqueued before fork + # belong to the preloaded parent process and must not be processed by every + # worker — otherwise uploads/scores would be duplicated across workers. + # + # Internally-managed httpx clients must also be recreated: fork() duplicates the + # parent's connection pool (TCP socket file descriptors) into the child. Both + # processes then share the same underlying sockets, causing data corruption and + # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty + # pool owned solely by this child process. + # + # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited + # copy is reused as-is, giving the caller the opportunity to handle process-safety + # themselves (e.g. by registering their own os.register_at_fork handler). + try: + self._init_api_clients() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) + + try: + self._init_media_manager() + self._init_consumer_threads() + self.prompt_cache = PromptCache() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload, score ingestion, and prompt cache refresh will be unavailable in this worker." + ) + + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads and prompt cache reinitialized after fork" ) @classmethod @@ -486,6 +612,8 @@ def flush(self) -> None: langfuse_logger.debug("Successfully flushed media upload queue") def shutdown(self) -> None: + self._shutdown = True + # Unregister the atexit handler first atexit.unregister(self.shutdown) diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index e1882fb4c..14dceec19 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -46,6 +46,17 @@ def __init__( LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" ).lower() not in ("false", "0") + def reinitialize( + self, + *, + api_client: LangfuseAPI, + httpx_client: httpx.Client, + media_upload_queue: Queue, + ) -> None: + self._api_client = api_client + self._httpx_client = httpx_client + self._queue = media_upload_queue + def process_next_media_upload(self) -> None: try: upload_job = self._queue.get(block=True, timeout=1) @@ -295,7 +306,9 @@ def _upload_media_sync( field: Optional[str] = None, ) -> None: if not self._enabled: - raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") + raise ValueError( + "Cannot upload LangfuseMedia while media upload is disabled." + ) if ( media._content_length is None diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 8e3b505c7..f66a1e052 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -172,6 +172,244 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): assert not consumer.is_alive() +def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): + """_at_fork_reinit() must replace queues and start fresh consumer threads.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-reinit", + secret_key="sk-fork-reinit", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + old_media_queue = rm._media_upload_queue + old_ingestion_consumers = list(rm._ingestion_consumers) + + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is not old_score_queue + assert rm._media_upload_queue is not old_media_queue + assert len(rm._ingestion_consumers) == 1 + assert rm._ingestion_consumers[0].is_alive() + + # In a real fork, old threads don't exist in the child process. + # In this unit test they do — stop them explicitly to avoid leaking threads. + for consumer in old_ingestion_consumers: + consumer.pause() + consumer.join(timeout=1.0) + + client.shutdown() + + +def test_at_fork_reinit_skips_when_shutdown(monkeypatch): + """_at_fork_reinit() must not restart threads after intentional shutdown.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-shutdown", + secret_key="sk-fork-shutdown", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + old_lock = LangfuseResourceManager._lock + + rm._shutdown = True + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is old_score_queue # queue must not be replaced + # The class-level lock is shared state the child still needs (e.g. to create + # a new client), so it must be replaced even when this instance is shut down. + assert LangfuseResourceManager._lock is not old_lock + + client.shutdown() + + +def test_at_fork_reinit_replaces_lock(monkeypatch): + """_at_fork_reinit() must replace the class-level lock with a fresh one. + + If a thread held _lock at fork time, the child has no such thread and the + lock can never be released, causing a deadlock. The reinit handler must + replace it before doing any other work so the child can always acquire it. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock", + secret_key="sk-fork-lock", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_lock = LangfuseResourceManager._lock + + rm._at_fork_reinit() + + assert LangfuseResourceManager._lock is not old_lock + # New lock must be immediately acquirable (not held by any thread). + acquired = LangfuseResourceManager._lock.acquire(blocking=False) + assert acquired, "New lock must not be held after _at_fork_reinit()" + LangfuseResourceManager._lock.release() + + client.shutdown() + + +def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatch): + """Simulate the fork-deadlock scenario: old lock held, new lock must still be acquirable. + + In a real fork, a thread holding _lock in the parent disappears in the child, + leaving the lock permanently acquired. Here we replicate that by acquiring the + old lock without releasing it, then calling _at_fork_reinit() and verifying that + the replacement lock is free. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock-held", + secret_key="sk-fork-lock-held", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + # Simulate the lock being permanently held (as it would be in a forked child + # when the owning thread no longer exists). + stuck_lock = LangfuseResourceManager._lock + stuck_lock.acquire() # held, never released — simulates the fork scenario + + try: + rm._at_fork_reinit() + + # The new lock must be a different object and must be acquirable. + new_lock = LangfuseResourceManager._lock + assert new_lock is not stuck_lock + acquired = new_lock.acquire(blocking=False) + assert acquired, "Replacement lock must be acquirable after _at_fork_reinit()" + new_lock.release() + finally: + stuck_lock.release() # clean up so other tests are not affected + + client.shutdown() + + +def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): + """_at_fork_reinit() must create a new httpx.Client to avoid sharing + connection-pool file descriptors (TCP sockets) across forked processes. + httpx.Client is thread-safe but not process-safe.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-default", + secret_key="sk-fork-httpx-default", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_httpx_client = rm.httpx_client + old_api = rm.api + old_async_api = rm.async_api + old_score_ingestion_client = rm._score_ingestion_client + assert client.api is old_api + assert client.async_api is old_async_api + + rm._at_fork_reinit() + + assert rm.httpx_client is not old_httpx_client + assert rm.api is not old_api + assert rm.async_api is not old_async_api + assert client.api is rm.api + assert client.api is not old_api + assert client.async_api is rm.async_api + assert client.async_api is not old_async_api + assert rm._score_ingestion_client is not old_score_ingestion_client + + client.shutdown() + + +def test_at_fork_reinit_preserves_custom_httpx_client(monkeypatch): + """After fork, a caller-supplied httpx.Client is reused as-is. + The caller is responsible for their own fork-safety (e.g. via their own + os.register_at_fork handler). The SDK must not silently replace it.""" + import httpx + + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + custom_client = httpx.Client(timeout=99) + client = Langfuse( + public_key="pk-fork-httpx-custom", + secret_key="sk-fork-httpx-custom", + httpx_client=custom_client, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + assert rm.httpx_client is custom_client + + rm._at_fork_reinit() + + # Custom client must be preserved — caller owns process-safety for it. + assert rm.httpx_client is custom_client + assert rm.api is not None + assert rm.async_api is not None + assert rm._score_ingestion_client is not None + + custom_client.close() + client.shutdown() + + +def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( + monkeypatch, +): + """After fork, the recreated httpx.Client must reflect the timeout and + additional_headers that were set on the resource manager.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-settings", + secret_key="sk-fork-httpx-settings", + timeout=42, + additional_headers={"X-Custom": "value"}, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + rm._at_fork_reinit() + + assert rm.httpx_client.timeout.connect == 42 + assert rm.httpx_client.headers.get("X-Custom") == "value" + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = []