From 590963318edd99d4e75d349aac2bdce521cb1eb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Tue, 19 May 2026 23:18:01 +0900 Subject: [PATCH 1/9] fix(resource_manager): reinitialize consumer threads after os.fork() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 111 ++++++++++++++++++++++----- tests/unit/test_resource_manager.py | 60 +++++++++++++++ 2 files changed, 150 insertions(+), 21 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 2d42f6ce1..41b6b47ca 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -17,6 +17,7 @@ import atexit import os import threading +import weakref from queue import Full, Queue from typing import Any, Callable, Dict, List, Optional, cast @@ -170,6 +171,7 @@ def _initialize_instance( self.base_url = base_url self.mask = mask self.environment = environment + self._shutdown = False # Store additional client settings for get_client() to use self.timeout = timeout @@ -243,7 +245,9 @@ def _initialize_instance( x_langfuse_public_key=self.public_key, timeout=timeout, ) - score_ingestion_client = LangfuseClient( + + # Store as instance variable so _at_fork_reinit can reuse without recreation + self._score_ingestion_client = LangfuseClient( public_key=self.public_key, secret_key=secret_key, base_url=base_url, @@ -257,6 +261,52 @@ def _initialize_instance( LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" ).lower() not in ("false", "0") + self._media_upload_thread_count = media_upload_thread_count or max( + int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 + ) + + self._init_consumer_threads() + + # Prompt cache + self.prompt_cache = PromptCache() + + # Register shutdown handler + atexit.register(self.shutdown) + + # Register fork handler to reinitialize consumer threads in child process. + # When using Gunicorn with --preload, os.fork() copies memory but not threads + # (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html). + # Without this, media upload and score ingestion threads are lost after fork, + # causing silent data loss. + # + # Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety + # for span export via its own os.register_at_fork. This handler covers the + # remaining background threads managed by LangfuseResourceManager. + # + # weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong + # reference to this instance, which would block garbage collection. + # See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py + if hasattr(os, "register_at_fork"): + weak_reinit = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( + # Walrus operator resolves the weak reference once and stores it in + # a temporary variable before calling it. This avoids a TOCTOU window + # where GC could collect the referent between checking for None and + # invoking the method. + after_in_child=lambda: (m := weak_reinit()) and m() + ) + + langfuse_logger.info( + f"Startup: Langfuse tracer successfully initialized | " + f"public_key={self.public_key} | " + f"base_url={base_url} | " + f"environment={environment or 'default'} | " + f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " + f"media_threads={self._media_upload_thread_count}" + ) + + def _init_consumer_threads(self) -> None: + """Initialize media upload and score ingestion consumer threads.""" self._media_upload_queue: Queue[Any] = Queue(100_000) self._media_manager = MediaManager( api_client=self.api, @@ -266,12 +316,8 @@ def _initialize_instance( ) self._media_upload_consumers = [] - media_upload_thread_count = media_upload_thread_count or max( - int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 - ) - if self._media_upload_enabled: - for i in range(media_upload_thread_count): + for i in range(self._media_upload_thread_count): media_upload_consumer = MediaUploadConsumer( identifier=i, media_manager=self._media_manager, @@ -279,9 +325,6 @@ def _initialize_instance( media_upload_consumer.start() self._media_upload_consumers.append(media_upload_consumer) - # Prompt cache - self.prompt_cache = PromptCache() - # Score ingestion self._score_ingestion_queue: Queue[Any] = Queue(100_000) self._ingestion_consumers = [] @@ -289,25 +332,49 @@ def _initialize_instance( ingestion_consumer = ScoreIngestionConsumer( ingestion_queue=self._score_ingestion_queue, identifier=0, - client=score_ingestion_client, - flush_at=flush_at, - flush_interval=flush_interval, + client=self._score_ingestion_client, + flush_at=self.flush_at, + flush_interval=self.flush_interval, max_retries=3, public_key=self.public_key, ) ingestion_consumer.start() self._ingestion_consumers.append(ingestion_consumer) - # Register shutdown handler - atexit.register(self.shutdown) + def _at_fork_reinit(self) -> None: + """Reinitialize consumer threads after fork in child process. - langfuse_logger.info( - f"Startup: Langfuse tracer successfully initialized | " - f"public_key={self.public_key} | " - f"base_url={base_url} | " - f"environment={environment or 'default'} | " - f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " - f"media_threads={media_upload_thread_count or 1}" + Called automatically via os.register_at_fork() after fork(). + Necessary for Gunicorn --preload deployments where os.fork() is used: + threads are not copied to child processes (POSIX standard), so without + reinitialization, the child process has no consumer threads and all + media upload and score ingestion events are silently lost. + + Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export + fork-safety separately via its own os.register_at_fork handler. + + Skipped if shutdown() was already called on this instance, to avoid + restarting threads on an intentionally torn-down manager. + """ + if self._shutdown: + return + + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." + ) + + # Queues are intentionally recreated after fork. Items enqueued before fork + # belong to the preloaded parent process and must not be processed by every + # worker — otherwise uploads/scores would be duplicated across workers. + # + # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated + # here to keep this handler minimal; this mirrors the existing singleton client + # lifecycle. If preload-time network I/O is introduced in the future, clients + # may need fork-specific reinitialization as well. + self._init_consumer_threads() + + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" ) @classmethod @@ -449,6 +516,8 @@ def flush(self) -> None: langfuse_logger.debug("Successfully flushed media upload queue") def shutdown(self) -> None: + self._shutdown = True + # Unregister the atexit handler first atexit.unregister(self.shutdown) diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index d0880dcd6..9bb74fe96 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -166,6 +166,66 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): assert not consumer.is_alive() +def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): + """_at_fork_reinit() must replace queues and start fresh consumer threads.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-reinit", + secret_key="sk-fork-reinit", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + old_media_queue = rm._media_upload_queue + old_ingestion_consumers = list(rm._ingestion_consumers) + + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is not old_score_queue + assert rm._media_upload_queue is not old_media_queue + assert len(rm._ingestion_consumers) == 1 + assert rm._ingestion_consumers[0].is_alive() + + # In a real fork, old threads don't exist in the child process. + # In this unit test they do — stop them explicitly to avoid leaking threads. + for consumer in old_ingestion_consumers: + consumer.pause() + consumer.join(timeout=1.0) + + client.shutdown() + + +def test_at_fork_reinit_skips_when_shutdown(monkeypatch): + """_at_fork_reinit() must not restart threads after intentional shutdown.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-shutdown", + secret_key="sk-fork-shutdown", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_score_queue = rm._score_ingestion_queue + + rm._shutdown = True + rm._at_fork_reinit() + + assert rm._score_ingestion_queue is old_score_queue # queue must not be replaced + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From f3b0b5304adea04051b5bf5eab45be9d8bed121e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Wed, 20 May 2026 00:08:05 +0900 Subject: [PATCH 2/9] fix(resource_manager): catch and log errors during fork child consumer reinitialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 41b6b47ca..824ab8717 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -371,7 +371,13 @@ def _at_fork_reinit(self) -> None: # here to keep this handler minimal; this mirrors the existing singleton client # lifecycle. If preload-time network I/O is introduced in the future, clients # may need fork-specific reinitialization as well. - self._init_consumer_threads() + try: + self._init_consumer_threads() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload and score ingestion will be unavailable in this worker." + ) langfuse_logger.debug( f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" From fd16c0b919d4d40cf1025745da3cb2c5942f289a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Sat, 30 May 2026 06:41:05 +0900 Subject: [PATCH 3/9] fix(resource_manager): reinitialize _lock after fork to prevent deadlock in child process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 5 ++ tests/unit/test_resource_manager.py | 74 ++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 824ab8717..244695365 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -359,6 +359,11 @@ def _at_fork_reinit(self) -> None: if self._shutdown: return + # The class-level lock may have been held by a thread in the parent at fork time. + # That thread does not exist in the child, so the lock can never be released and + # any attempt to acquire it would deadlock. Replace it with a fresh lock first. + LangfuseResourceManager._lock = threading.RLock() + langfuse_logger.debug( f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." ) diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 9bb74fe96..a55b4602f 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -226,6 +226,80 @@ def test_at_fork_reinit_skips_when_shutdown(monkeypatch): client.shutdown() +def test_at_fork_reinit_replaces_lock(monkeypatch): + """_at_fork_reinit() must replace the class-level lock with a fresh one. + + If a thread held _lock at fork time, the child has no such thread and the + lock can never be released, causing a deadlock. The reinit handler must + replace it before doing any other work so the child can always acquire it. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock", + secret_key="sk-fork-lock", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_lock = LangfuseResourceManager._lock + + rm._at_fork_reinit() + + assert LangfuseResourceManager._lock is not old_lock + # New lock must be immediately acquirable (not held by any thread). + acquired = LangfuseResourceManager._lock.acquire(blocking=False) + assert acquired, "New lock must not be held after _at_fork_reinit()" + LangfuseResourceManager._lock.release() + + client.shutdown() + + +def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatch): + """Simulate the fork-deadlock scenario: old lock held, new lock must still be acquirable. + + In a real fork, a thread holding _lock in the parent disappears in the child, + leaving the lock permanently acquired. Here we replicate that by acquiring the + old lock without releasing it, then calling _at_fork_reinit() and verifying that + the replacement lock is free. + """ + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-lock-held", + secret_key="sk-fork-lock-held", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + # Simulate the lock being permanently held (as it would be in a forked child + # when the owning thread no longer exists). + stuck_lock = LangfuseResourceManager._lock + stuck_lock.acquire() # held, never released — simulates the fork scenario + + try: + rm._at_fork_reinit() + + # The new lock must be a different object and must be acquirable. + new_lock = LangfuseResourceManager._lock + assert new_lock is not stuck_lock + acquired = new_lock.acquire(blocking=False) + assert acquired, "Replacement lock must be acquirable after _at_fork_reinit()" + new_lock.release() + finally: + stuck_lock.release() # clean up so other tests are not affected + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From eba27cc57c8fbd746658262a9104e06be821537f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Sat, 30 May 2026 07:17:12 +0900 Subject: [PATCH 4/9] fix(resource_manager): recreate HTTP clients after fork to prevent process-unsafe socket sharing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/client.py | 7 +++ langfuse/_client/resource_manager.py | 39 ++++++++++-- tests/unit/test_resource_manager.py | 92 ++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 2f1c8d783..44cb62640 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -164,6 +164,13 @@ class Langfuse: host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. + **Fork limitation**: ``httpx.Client`` is thread-safe but not process-safe. When using + ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK recreates the HTTP + client in child processes after fork to avoid sharing file descriptors (TCP sockets) across + processes. A custom ``httpx_client`` will therefore be replaced by a new default client in + child processes — any custom transport, SSL, or proxy settings will not carry over. + If you need those settings in forked workers, configure them via environment variables or + apply them in an ``after_in_child`` fork handler instead. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 244695365..76124fbe7 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -372,10 +372,41 @@ def _at_fork_reinit(self) -> None: # belong to the preloaded parent process and must not be processed by every # worker — otherwise uploads/scores would be duplicated across workers. # - # HTTP clients (self.httpx_client, self._score_ingestion_client) are not recreated - # here to keep this handler minimal; this mirrors the existing singleton client - # lifecycle. If preload-time network I/O is introduced in the future, clients - # may need fork-specific reinitialization as well. + # HTTP clients must also be recreated. httpx.Client is thread-safe but not + # process-safe: fork() duplicates the parent's connection pool (TCP socket file + # descriptors) into the child. Both processes then share the same underlying + # sockets, which causes data corruption and SSL/TLS state mismatch under + # concurrent use. Fresh clients start with an empty pool owned solely by this + # child process. + try: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) + try: self._init_consumer_threads() except Exception as e: diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index a55b4602f..13c814a6d 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -300,6 +300,98 @@ def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatc client.shutdown() +def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): + """_at_fork_reinit() must create a new httpx.Client to avoid sharing + connection-pool file descriptors (TCP sockets) across forked processes. + httpx.Client is thread-safe but not process-safe.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-default", + secret_key="sk-fork-httpx-default", + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + old_httpx_client = rm.httpx_client + old_api = rm.api + old_score_ingestion_client = rm._score_ingestion_client + + rm._at_fork_reinit() + + assert rm.httpx_client is not old_httpx_client + assert rm.api is not old_api + assert rm._score_ingestion_client is not old_score_ingestion_client + + client.shutdown() + + +def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): + """_at_fork_reinit() must replace a user-provided httpx.Client too. + Users cannot react to fork() inside the SDK singleton, so the library + must always produce a process-safe client — even at the cost of losing + custom settings. This is a documented limitation.""" + import httpx + + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + custom_client = httpx.Client(timeout=99) + client = Langfuse( + public_key="pk-fork-httpx-custom", + secret_key="sk-fork-httpx-custom", + httpx_client=custom_client, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + assert rm.httpx_client is custom_client + + rm._at_fork_reinit() + + # Custom client must be replaced — sharing it cross-process is not safe. + assert rm.httpx_client is not custom_client + assert rm.api is not None + assert rm._score_ingestion_client is not None + + custom_client.close() + client.shutdown() + + +def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( + monkeypatch, +): + """After fork, the recreated httpx.Client must reflect the timeout and + additional_headers that were set on the resource manager.""" + monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") + + with LangfuseResourceManager._lock: + LangfuseResourceManager._instances.clear() + + client = Langfuse( + public_key="pk-fork-httpx-settings", + secret_key="sk-fork-httpx-settings", + timeout=42, + additional_headers={"X-Custom": "value"}, + span_exporter=NoOpSpanExporter(), + ) + rm = client._resources + assert rm is not None + + rm._at_fork_reinit() + + assert rm.httpx_client.timeout.connect == 42 + assert rm.httpx_client.headers.get("X-Custom") == "value" + + client.shutdown() + + def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all(): events = [] From 157122e0a346e24692f4aa071b45b16301fd3f94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Mon, 1 Jun 2026 20:50:51 +0900 Subject: [PATCH 5/9] fix(resource_manager): preserve custom httpx_client after fork instead of replacing it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/client.py | 14 +++++++------- langfuse/_client/resource_manager.py | 26 ++++++++++++++++---------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 44cb62640..bbd25f333 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -164,13 +164,13 @@ class Langfuse: host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com". timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds. httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created. - **Fork limitation**: ``httpx.Client`` is thread-safe but not process-safe. When using - ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK recreates the HTTP - client in child processes after fork to avoid sharing file descriptors (TCP sockets) across - processes. A custom ``httpx_client`` will therefore be replaced by a new default client in - child processes — any custom transport, SSL, or proxy settings will not carry over. - If you need those settings in forked workers, configure them via environment variables or - apply them in an ``after_in_child`` fork handler instead. + **Fork safety**: ``httpx.Client`` is thread-safe but not process-safe. When using + ``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK automatically + recreates its internally-managed HTTP client in child processes after fork. A custom + ``httpx_client`` is intentionally left as-is (the fork-inherited copy is reused), so + you retain the opportunity to handle process-safety yourself — for example by + registering your own ``os.register_at_fork(after_in_child=...)`` handler to close and + reopen connections on the custom client. debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable. tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable. flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable. diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 76124fbe7..525d79eae 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -219,6 +219,7 @@ def _initialize_instance( ## use connection pools with limited capacity. Creating multiple instances ## could exhaust the OS's maximum number of available TCP sockets (file descriptors), ## leading to connection errors. + self._custom_httpx_client = httpx_client if httpx_client is not None: self.httpx_client = httpx_client else: @@ -372,17 +373,22 @@ def _at_fork_reinit(self) -> None: # belong to the preloaded parent process and must not be processed by every # worker — otherwise uploads/scores would be duplicated across workers. # - # HTTP clients must also be recreated. httpx.Client is thread-safe but not - # process-safe: fork() duplicates the parent's connection pool (TCP socket file - # descriptors) into the child. Both processes then share the same underlying - # sockets, which causes data corruption and SSL/TLS state mismatch under - # concurrent use. Fresh clients start with an empty pool owned solely by this - # child process. + # Internally-managed httpx clients must also be recreated: fork() duplicates the + # parent's connection pool (TCP socket file descriptors) into the child. Both + # processes then share the same underlying sockets, causing data corruption and + # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty + # pool owned solely by this child process. + # + # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited + # copy is reused as-is, giving the caller the opportunity to handle process-safety + # themselves (e.g. by registering their own os.register_at_fork handler). try: - client_headers = self.additional_headers if self.additional_headers else {} - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) + if self._custom_httpx_client is None: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + self.api = LangfuseAPI( base_url=self.base_url, username=self.public_key, From 68327b722c7cbc3e9ae01cc2c2170278501ca43a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Mon, 15 Jun 2026 22:57:26 +0900 Subject: [PATCH 6/9] fix(resource_manager): defer post-fork reinitialization to avoid segfault MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 149 +++++++++++++++++---------- tests/unit/test_resource_manager.py | 39 +++++-- 2 files changed, 123 insertions(+), 65 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 525d79eae..d439395a5 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -343,7 +343,7 @@ def _init_consumer_threads(self) -> None: self._ingestion_consumers.append(ingestion_consumer) def _at_fork_reinit(self) -> None: - """Reinitialize consumer threads after fork in child process. + """Mark that post-fork reinitialization is needed; do no heavy work here. Called automatically via os.register_at_fork() after fork(). Necessary for Gunicorn --preload deployments where os.fork() is used: @@ -356,6 +356,12 @@ def _at_fork_reinit(self) -> None: Skipped if shutdown() was already called on this instance, to avoid restarting threads on an intentionally torn-down manager. + + Heavy work (httpx.Client creation, thread spawning) is intentionally + deferred to _ensure_post_fork_initialized(), called on first use. + Doing that work here — inside the after_in_child handler — triggers + SSL/TLS and Objective-C runtime calls that are unsafe in the narrow + post-fork window and cause segfaults on macOS with gunicorn --preload. """ if self._shutdown: return @@ -365,65 +371,96 @@ def _at_fork_reinit(self) -> None: # any attempt to acquire it would deadlock. Replace it with a fresh lock first. LangfuseResourceManager._lock = threading.RLock() - langfuse_logger.debug( - f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." - ) + # Replace queues with fresh empty ones so flush() (e.g. via atexit) does not + # block waiting for pre-fork items that no consumer will ever drain. + # Queue() is pure Python — safe to call here. + self._media_upload_queue = Queue(100_000) + self._score_ingestion_queue = Queue(100_000) + self._media_upload_consumers = [] + self._ingestion_consumers = [] - # Queues are intentionally recreated after fork. Items enqueued before fork - # belong to the preloaded parent process and must not be processed by every - # worker — otherwise uploads/scores would be duplicated across workers. - # - # Internally-managed httpx clients must also be recreated: fork() duplicates the - # parent's connection pool (TCP socket file descriptors) into the child. Both - # processes then share the same underlying sockets, causing data corruption and - # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty - # pool owned solely by this child process. - # - # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited - # copy is reused as-is, giving the caller the opportunity to handle process-safety - # themselves (e.g. by registering their own os.register_at_fork handler). - try: - if self._custom_httpx_client is None: - client_headers = self.additional_headers if self.additional_headers else {} - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) + # Signal that HTTP clients and consumer threads need to be recreated on first use. + self._needs_post_fork_reinit = True + # Fresh lock to guard the one-time lazy reinit below. + self._post_fork_reinit_lock = threading.Lock() - self.api = LangfuseAPI( - base_url=self.base_url, - username=self.public_key, - password=self.secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=self.timeout, - ) - self._score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=self.secret_key, - base_url=self.base_url, - version=langfuse_version, - timeout=self.timeout or 20, - session=self.httpx_client, - ) - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " - f"Network requests may fail in this worker." - ) + def _ensure_post_fork_initialized(self) -> None: + """Lazily recreate HTTP clients and consumer threads after fork. - try: - self._init_consumer_threads() - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " - f"Media upload and score ingestion will be unavailable in this worker." + Called at the start of add_score_task() / add_trace_task() so that + the first actual work in the child process triggers full reinitialization. + The deferred approach avoids doing SSL/thread-creation work inside the + after_in_child handler where it causes segfaults on macOS. + """ + if not getattr(self, "_needs_post_fork_reinit", False): + return + + with self._post_fork_reinit_lock: + if not self._needs_post_fork_reinit: + return + + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse HTTP clients and consumer threads." ) - langfuse_logger.debug( - f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" - ) + # Queues are intentionally recreated here (not reused from _at_fork_reinit). + # Items enqueued before fork belong to the parent and must not be processed + # by every worker — that would duplicate uploads/scores across workers. + # + # Internally-managed httpx clients must also be recreated: fork() duplicates + # the parent's connection pool (TCP socket file descriptors) into the child. + # Both processes would then share the same underlying sockets, causing data + # corruption and SSL/TLS state mismatch under concurrent use. + # + # Custom httpx clients provided by the caller are NOT recreated. The + # fork-inherited copy is reused, giving the caller the opportunity to handle + # process-safety themselves (e.g. via their own os.register_at_fork handler). + try: + if self._custom_httpx_client is None: + client_headers = ( + self.additional_headers if self.additional_headers else {} + ) + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) + + try: + self._init_consumer_threads() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload and score ingestion will be unavailable in this worker." + ) + + self._needs_post_fork_reinit = False + + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + ) @classmethod def reset(cls) -> None: @@ -434,6 +471,7 @@ def reset(cls) -> None: cls._instances.clear() def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: + self._ensure_post_fork_initialized() try: # Sample scores with the same sampler that is used for tracing tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) @@ -482,6 +520,7 @@ def add_trace_task( self, event: dict, ) -> None: + self._ensure_post_fork_initialized() try: langfuse_logger.debug( f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}" diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 13c814a6d..89bed1bb7 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -167,7 +167,11 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): - """_at_fork_reinit() must replace queues and start fresh consumer threads.""" + """_at_fork_reinit() replaces queues immediately; consumers start on first use. + + Heavy work (httpx.Client, thread spawning) is deferred to avoid segfaults + caused by SSL/TLS initialization inside the after_in_child handler on macOS. + """ monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") with LangfuseResourceManager._lock: @@ -187,10 +191,19 @@ def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): rm._at_fork_reinit() + # Queues are replaced immediately (lightweight, safe in after_in_child). assert rm._score_ingestion_queue is not old_score_queue assert rm._media_upload_queue is not old_media_queue + # Consumer threads are NOT started yet — deferred to first use. + assert len(rm._ingestion_consumers) == 0 + assert rm._needs_post_fork_reinit is True + + # Trigger lazy initialization (as add_score_task / add_trace_task would). + rm._ensure_post_fork_initialized() + assert len(rm._ingestion_consumers) == 1 assert rm._ingestion_consumers[0].is_alive() + assert rm._needs_post_fork_reinit is False # In a real fork, old threads don't exist in the child process. # In this unit test they do — stop them explicitly to avoid leaking threads. @@ -301,8 +314,8 @@ def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatc def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): - """_at_fork_reinit() must create a new httpx.Client to avoid sharing - connection-pool file descriptors (TCP sockets) across forked processes. + """After fork, an internally-managed httpx.Client is replaced on first use to + avoid sharing connection-pool file descriptors (TCP sockets) across processes. httpx.Client is thread-safe but not process-safe.""" monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") @@ -322,6 +335,11 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): old_score_ingestion_client = rm._score_ingestion_client rm._at_fork_reinit() + # Lazy: heavy init has not run yet. + assert rm.httpx_client is old_httpx_client + + # Trigger lazy initialization (as add_score_task / add_trace_task would). + rm._ensure_post_fork_initialized() assert rm.httpx_client is not old_httpx_client assert rm.api is not old_api @@ -330,11 +348,10 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): client.shutdown() -def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): - """_at_fork_reinit() must replace a user-provided httpx.Client too. - Users cannot react to fork() inside the SDK singleton, so the library - must always produce a process-safe client — even at the cost of losing - custom settings. This is a documented limitation.""" +def test_at_fork_reinit_preserves_custom_httpx_client(monkeypatch): + """After fork, a caller-supplied httpx.Client is reused as-is. + The caller is responsible for their own fork-safety (e.g. via their own + os.register_at_fork handler). The SDK must not silently replace it.""" import httpx monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") @@ -354,9 +371,10 @@ def test_at_fork_reinit_replaces_custom_httpx_client(monkeypatch): assert rm.httpx_client is custom_client rm._at_fork_reinit() + rm._ensure_post_fork_initialized() - # Custom client must be replaced — sharing it cross-process is not safe. - assert rm.httpx_client is not custom_client + # Custom client must be preserved — caller owns process-safety for it. + assert rm.httpx_client is custom_client assert rm.api is not None assert rm._score_ingestion_client is not None @@ -385,6 +403,7 @@ def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( assert rm is not None rm._at_fork_reinit() + rm._ensure_post_fork_initialized() assert rm.httpx_client.timeout.connect == 42 assert rm.httpx_client.headers.get("X-Custom") == "value" From 9cafdf3a0367b981e947afd5e31abf9db6d83bd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=98=81=EA=B7=9C?= Date: Thu, 2 Jul 2026 05:41:02 +0900 Subject: [PATCH 7/9] fix(resource_manager): skip macOS proxy discovery after fork to prevent segfault MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 박영규 --- langfuse/_client/resource_manager.py | 158 +++++++++++---------------- tests/unit/test_resource_manager.py | 26 +---- 2 files changed, 67 insertions(+), 117 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index d439395a5..b71721f85 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -16,6 +16,7 @@ import atexit import os +import sys import threading import weakref from queue import Full, Queue @@ -343,7 +344,7 @@ def _init_consumer_threads(self) -> None: self._ingestion_consumers.append(ingestion_consumer) def _at_fork_reinit(self) -> None: - """Mark that post-fork reinitialization is needed; do no heavy work here. + """Reinitialize consumer threads after fork in child process. Called automatically via os.register_at_fork() after fork(). Necessary for Gunicorn --preload deployments where os.fork() is used: @@ -356,112 +357,83 @@ def _at_fork_reinit(self) -> None: Skipped if shutdown() was already called on this instance, to avoid restarting threads on an intentionally torn-down manager. - - Heavy work (httpx.Client creation, thread spawning) is intentionally - deferred to _ensure_post_fork_initialized(), called on first use. - Doing that work here — inside the after_in_child handler — triggers - SSL/TLS and Objective-C runtime calls that are unsafe in the narrow - post-fork window and cause segfaults on macOS with gunicorn --preload. """ if self._shutdown: return + if sys.platform == "darwin": + # urllib proxy discovery calls macOS SystemConfiguration APIs that + # are not safe to invoke after fork(). Setting no_proxy="*" makes + # httpx skip getproxies() entirely in this child process. + # See: https://docs.python.org/3/library/urllib.request.html + os.environ["no_proxy"] = "*" + os.environ["NO_PROXY"] = "*" + # The class-level lock may have been held by a thread in the parent at fork time. # That thread does not exist in the child, so the lock can never be released and # any attempt to acquire it would deadlock. Replace it with a fresh lock first. LangfuseResourceManager._lock = threading.RLock() - # Replace queues with fresh empty ones so flush() (e.g. via atexit) does not - # block waiting for pre-fork items that no consumer will ever drain. - # Queue() is pure Python — safe to call here. - self._media_upload_queue = Queue(100_000) - self._score_ingestion_queue = Queue(100_000) - self._media_upload_consumers = [] - self._ingestion_consumers = [] - - # Signal that HTTP clients and consumer threads need to be recreated on first use. - self._needs_post_fork_reinit = True - # Fresh lock to guard the one-time lazy reinit below. - self._post_fork_reinit_lock = threading.Lock() - - def _ensure_post_fork_initialized(self) -> None: - """Lazily recreate HTTP clients and consumer threads after fork. - - Called at the start of add_score_task() / add_trace_task() so that - the first actual work in the child process triggers full reinitialization. - The deferred approach avoids doing SSL/thread-creation work inside the - after_in_child handler where it causes segfaults on macOS. - """ - if not getattr(self, "_needs_post_fork_reinit", False): - return - - with self._post_fork_reinit_lock: - if not self._needs_post_fork_reinit: - return - - langfuse_logger.debug( - f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse HTTP clients and consumer threads." - ) - - # Queues are intentionally recreated here (not reused from _at_fork_reinit). - # Items enqueued before fork belong to the parent and must not be processed - # by every worker — that would duplicate uploads/scores across workers. - # - # Internally-managed httpx clients must also be recreated: fork() duplicates - # the parent's connection pool (TCP socket file descriptors) into the child. - # Both processes would then share the same underlying sockets, causing data - # corruption and SSL/TLS state mismatch under concurrent use. - # - # Custom httpx clients provided by the caller are NOT recreated. The - # fork-inherited copy is reused, giving the caller the opportunity to handle - # process-safety themselves (e.g. via their own os.register_at_fork handler). - try: - if self._custom_httpx_client is None: - client_headers = ( - self.additional_headers if self.additional_headers else {} - ) - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) - - self.api = LangfuseAPI( - base_url=self.base_url, - username=self.public_key, - password=self.secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=self.timeout, - ) - self._score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=self.secret_key, - base_url=self.base_url, - version=langfuse_version, - timeout=self.timeout or 20, - session=self.httpx_client, - ) - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " - f"Network requests may fail in this worker." - ) + langfuse_logger.debug( + f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." + ) - try: - self._init_consumer_threads() - except Exception as e: - langfuse_logger.error( - f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " - f"Media upload and score ingestion will be unavailable in this worker." + # Queues are intentionally recreated after fork. Items enqueued before fork + # belong to the preloaded parent process and must not be processed by every + # worker — otherwise uploads/scores would be duplicated across workers. + # + # Internally-managed httpx clients must also be recreated: fork() duplicates the + # parent's connection pool (TCP socket file descriptors) into the child. Both + # processes then share the same underlying sockets, causing data corruption and + # SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty + # pool owned solely by this child process. + # + # Custom httpx clients provided by the caller are NOT recreated. The fork-inherited + # copy is reused as-is, giving the caller the opportunity to handle process-safety + # themselves (e.g. by registering their own os.register_at_fork handler). + try: + if self._custom_httpx_client is None: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers ) - self._needs_post_fork_reinit = False + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " + f"Network requests may fail in this worker." + ) - langfuse_logger.debug( - f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + try: + self._init_consumer_threads() + except Exception as e: + langfuse_logger.error( + f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " + f"Media upload and score ingestion will be unavailable in this worker." ) + langfuse_logger.debug( + f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" + ) + @classmethod def reset(cls) -> None: with cls._lock: @@ -471,7 +443,6 @@ def reset(cls) -> None: cls._instances.clear() def add_score_task(self, event: dict, *, force_sample: bool = False) -> None: - self._ensure_post_fork_initialized() try: # Sample scores with the same sampler that is used for tracing tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) @@ -520,7 +491,6 @@ def add_trace_task( self, event: dict, ) -> None: - self._ensure_post_fork_initialized() try: langfuse_logger.debug( f"Trace: Enqueuing event type={event['type']} for trace_id={event['body'].id}" diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index 89bed1bb7..50e4e7a74 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -167,11 +167,7 @@ def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread(): def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): - """_at_fork_reinit() replaces queues immediately; consumers start on first use. - - Heavy work (httpx.Client, thread spawning) is deferred to avoid segfaults - caused by SSL/TLS initialization inside the after_in_child handler on macOS. - """ + """_at_fork_reinit() must replace queues and start fresh consumer threads.""" monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") with LangfuseResourceManager._lock: @@ -191,19 +187,10 @@ def test_at_fork_reinit_creates_new_queues_and_consumers(monkeypatch): rm._at_fork_reinit() - # Queues are replaced immediately (lightweight, safe in after_in_child). assert rm._score_ingestion_queue is not old_score_queue assert rm._media_upload_queue is not old_media_queue - # Consumer threads are NOT started yet — deferred to first use. - assert len(rm._ingestion_consumers) == 0 - assert rm._needs_post_fork_reinit is True - - # Trigger lazy initialization (as add_score_task / add_trace_task would). - rm._ensure_post_fork_initialized() - assert len(rm._ingestion_consumers) == 1 assert rm._ingestion_consumers[0].is_alive() - assert rm._needs_post_fork_reinit is False # In a real fork, old threads don't exist in the child process. # In this unit test they do — stop them explicitly to avoid leaking threads. @@ -314,8 +301,8 @@ def test_at_fork_reinit_new_lock_acquirable_even_if_old_lock_was_held(monkeypatc def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): - """After fork, an internally-managed httpx.Client is replaced on first use to - avoid sharing connection-pool file descriptors (TCP sockets) across processes. + """_at_fork_reinit() must create a new httpx.Client to avoid sharing + connection-pool file descriptors (TCP sockets) across forked processes. httpx.Client is thread-safe but not process-safe.""" monkeypatch.setenv("LANGFUSE_MEDIA_UPLOAD_ENABLED", "false") @@ -335,11 +322,6 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): old_score_ingestion_client = rm._score_ingestion_client rm._at_fork_reinit() - # Lazy: heavy init has not run yet. - assert rm.httpx_client is old_httpx_client - - # Trigger lazy initialization (as add_score_task / add_trace_task would). - rm._ensure_post_fork_initialized() assert rm.httpx_client is not old_httpx_client assert rm.api is not old_api @@ -371,7 +353,6 @@ def test_at_fork_reinit_preserves_custom_httpx_client(monkeypatch): assert rm.httpx_client is custom_client rm._at_fork_reinit() - rm._ensure_post_fork_initialized() # Custom client must be preserved — caller owns process-safety for it. assert rm.httpx_client is custom_client @@ -403,7 +384,6 @@ def test_at_fork_reinit_new_httpx_client_uses_configured_timeout_and_headers( assert rm is not None rm._at_fork_reinit() - rm._ensure_post_fork_initialized() assert rm.httpx_client.timeout.connect == 42 assert rm.httpx_client.headers.get("X-Custom") == "value" From ffd4a8643d0a29d7f8ac0bee44e79f59d2d92a81 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Thu, 2 Jul 2026 14:52:19 +0200 Subject: [PATCH 8/9] fix(resource_manager): reuse client initialization after fork --- langfuse/_client/resource_manager.py | 144 ++++++++++++--------------- langfuse/_utils/prompt_cache.py | 17 +--- tests/unit/test_prompt.py | 37 ------- tests/unit/test_resource_manager.py | 7 ++ 4 files changed, 73 insertions(+), 132 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index c3ab8cc31..67c44920a 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -18,6 +18,7 @@ import os import sys import threading +import urllib.request import weakref from queue import Full, Queue from typing import Any, Callable, Dict, List, Optional, cast @@ -81,6 +82,10 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + _otel_tracer: Tracer + _media_manager: MediaManager + _media_upload_consumers: List[MediaUploadConsumer] + _ingestion_consumers: List[ScoreIngestionConsumer] @classmethod def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: @@ -219,49 +224,8 @@ def _initialize_instance( self.span_exporter = span_exporter self.tracer_provider: Optional[TracerProvider] = None - # API Clients - - ## API clients must be singletons because the underlying HTTPX clients - ## use connection pools with limited capacity. Creating multiple instances - ## could exhaust the OS's maximum number of available TCP sockets (file descriptors), - ## leading to connection errors. self._custom_httpx_client = httpx_client - if httpx_client is not None: - self.httpx_client = httpx_client - else: - # Create a new httpx client with additional_headers if provided - client_headers = additional_headers if additional_headers else {} - self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers) - - self.api = LangfuseAPI( - base_url=base_url, - username=self.public_key, - password=secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=timeout, - ) - self.async_api = AsyncLangfuseAPI( - base_url=base_url, - username=self.public_key, - password=secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - timeout=timeout, - ) - - # Store as instance variable so _at_fork_reinit can reuse without recreation - self._score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=secret_key, - base_url=base_url, - version=langfuse_version, - timeout=timeout or 20, - session=self.httpx_client, - ) + self._init_api_clients() # Media self._media_upload_enabled = os.environ.get( @@ -365,6 +329,49 @@ def _init_media_manager(self) -> None: self._media_upload_consumers = [] + def _init_api_clients(self) -> None: + """Initialize HTTP-backed API clients. + + Internally-managed httpx clients are recreated when this method is + called after fork. Caller-provided clients are preserved because their + lifecycle belongs to the caller. + """ + if self._custom_httpx_client is not None: + self.httpx_client = self._custom_httpx_client + else: + client_headers = self.additional_headers if self.additional_headers else {} + self.httpx_client = httpx.Client( + timeout=self.timeout, headers=client_headers + ) + + self.api = LangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + httpx_client=self.httpx_client, + timeout=self.timeout, + ) + self.async_api = AsyncLangfuseAPI( + base_url=self.base_url, + username=self.public_key, + password=self.secret_key, + x_langfuse_sdk_name="python", + x_langfuse_sdk_version=langfuse_version, + x_langfuse_public_key=self.public_key, + timeout=self.timeout, + ) + self._score_ingestion_client = LangfuseClient( + public_key=self.public_key, + secret_key=self.secret_key, + base_url=self.base_url, + version=langfuse_version, + timeout=self.timeout or 20, + session=self.httpx_client, + ) + def _init_consumer_threads(self) -> None: """Initialize media upload and score ingestion consumer threads.""" if self._media_upload_enabled: @@ -407,22 +414,26 @@ def _at_fork_reinit(self) -> None: Skipped if shutdown() was already called on this instance, to avoid restarting threads on an intentionally torn-down manager. """ + # The class-level lock may have been held by a thread in the parent at fork time. + # That thread does not exist in the child, so the lock can never be released and + # any attempt to acquire it would deadlock. Replace it before the shutdown check: + # the lock is class-level state needed by the child (e.g. to create a new client) + # even if this particular instance was already shut down. + LangfuseResourceManager._lock = threading.RLock() + if self._shutdown: return - if sys.platform == "darwin": - # urllib proxy discovery calls macOS SystemConfiguration APIs that - # are not safe to invoke after fork(). Setting no_proxy="*" makes - # httpx skip getproxies() entirely in this child process. - # See: https://docs.python.org/3/library/urllib.request.html + if sys.platform == "darwin" and not urllib.request.getproxies_environment(): + # urllib proxy discovery falls back to macOS SystemConfiguration APIs that + # are not safe to invoke after fork(). Setting no_proxy="*" makes httpx and + # requests skip that lookup entirely in this child process. Skipped when + # proxies are configured via environment variables: urllib then never touches + # SystemConfiguration (no segfault risk), and overriding no_proxy would + # disable the user's proxy setup process-wide. os.environ["no_proxy"] = "*" os.environ["NO_PROXY"] = "*" - # The class-level lock may have been held by a thread in the parent at fork time. - # That thread does not exist in the child, so the lock can never be released and - # any attempt to acquire it would deadlock. Replace it with a fresh lock first. - LangfuseResourceManager._lock = threading.RLock() - langfuse_logger.debug( f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." ) @@ -441,32 +452,7 @@ def _at_fork_reinit(self) -> None: # copy is reused as-is, giving the caller the opportunity to handle process-safety # themselves (e.g. by registering their own os.register_at_fork handler). try: - if self._custom_httpx_client is None: - client_headers = ( - self.additional_headers if self.additional_headers else {} - ) - self.httpx_client = httpx.Client( - timeout=self.timeout, headers=client_headers - ) - - self.api = LangfuseAPI( - base_url=self.base_url, - username=self.public_key, - password=self.secret_key, - x_langfuse_sdk_name="python", - x_langfuse_sdk_version=langfuse_version, - x_langfuse_public_key=self.public_key, - httpx_client=self.httpx_client, - timeout=self.timeout, - ) - self._score_ingestion_client = LangfuseClient( - public_key=self.public_key, - secret_key=self.secret_key, - base_url=self.base_url, - version=langfuse_version, - timeout=self.timeout or 20, - session=self.httpx_client, - ) + self._init_api_clients() except Exception as e: langfuse_logger.error( f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " @@ -476,7 +462,7 @@ def _at_fork_reinit(self) -> None: try: self._init_media_manager() self._init_consumer_threads() - self.prompt_cache.reinitialize_after_fork() + self.prompt_cache = PromptCache() except Exception as e: langfuse_logger.error( f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " diff --git a/langfuse/_utils/prompt_cache.py b/langfuse/_utils/prompt_cache.py index cd0458319..7d9c2298b 100644 --- a/langfuse/_utils/prompt_cache.py +++ b/langfuse/_utils/prompt_cache.py @@ -85,22 +85,12 @@ def __init__(self, threads: int = 1): self._processing_keys = set() self._lock = RLock() - self._start_consumers() - - atexit.register(self.shutdown) - - def _start_consumers(self) -> None: for i in range(self._threads): consumer = PromptCacheRefreshConsumer(self._queue, i) consumer.start() self._consumers.append(consumer) - def reinitialize_after_fork(self) -> None: - self._queue = Queue() - self._consumers = [] - self._processing_keys = set() - self._lock = RLock() - self._start_consumers() + atexit.register(self.shutdown) def add_task(self, key: str, task: Callable[[], None]) -> None: with self._lock: @@ -171,11 +161,6 @@ def __init__( self._task_manager = PromptCacheTaskManager(threads=max_prompt_refresh_workers) logger.debug("Prompt cache initialized.") - def reinitialize_after_fork(self) -> None: - self._cache = {} - self._lock = RLock() - self._task_manager.reinitialize_after_fork() - def get(self, key: str) -> Optional[PromptCacheItem]: with self._lock: return self._cache.get(key, None) diff --git a/tests/unit/test_prompt.py b/tests/unit/test_prompt.py index aee6ac69a..eadfb8221 100644 --- a/tests/unit/test_prompt.py +++ b/tests/unit/test_prompt.py @@ -179,43 +179,6 @@ def put(self, item): ] -def test_prompt_cache_task_manager_reinitializes_after_fork(): - manager = PromptCacheTaskManager(threads=0) - old_queue = manager._queue - old_lock = manager._lock - - manager._processing_keys.add("prompt-label:production") - manager._threads = 1 - - manager.reinitialize_after_fork() - - assert manager._queue is not old_queue - assert manager._lock is not old_lock - assert manager._processing_keys == set() - assert len(manager._consumers) == 1 - assert manager._consumers[0].is_alive() - - manager.shutdown() - - -def test_prompt_cache_reinitializes_after_fork(): - cache = PromptCache(max_prompt_refresh_workers=0) - old_lock = cache._lock - old_task_manager = cache._task_manager - - cache._cache["prompt-label:production"] = object() - cache._task_manager._processing_keys.add("prompt-label:production") - - cache.reinitialize_after_fork() - - assert cache._cache == {} - assert cache._lock is not old_lock - assert cache._task_manager is old_task_manager - assert cache._task_manager._processing_keys == set() - - cache._task_manager.shutdown() - - def test_get_fresh_prompt(langfuse): prompt_name = "test_get_fresh_prompt" prompt = Prompt_Text( diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index c9b2609cc..c1e91307d 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -223,11 +223,15 @@ def test_at_fork_reinit_skips_when_shutdown(monkeypatch): assert rm is not None old_score_queue = rm._score_ingestion_queue + old_lock = LangfuseResourceManager._lock rm._shutdown = True rm._at_fork_reinit() assert rm._score_ingestion_queue is old_score_queue # queue must not be replaced + # The class-level lock is shared state the child still needs (e.g. to create + # a new client), so it must be replaced even when this instance is shut down. + assert LangfuseResourceManager._lock is not old_lock client.shutdown() @@ -325,12 +329,14 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): old_httpx_client = rm.httpx_client old_api = rm.api + old_async_api = rm.async_api old_score_ingestion_client = rm._score_ingestion_client rm._at_fork_reinit() assert rm.httpx_client is not old_httpx_client assert rm.api is not old_api + assert rm.async_api is not old_async_api assert rm._score_ingestion_client is not old_score_ingestion_client client.shutdown() @@ -363,6 +369,7 @@ def test_at_fork_reinit_preserves_custom_httpx_client(monkeypatch): # Custom client must be preserved — caller owns process-safety for it. assert rm.httpx_client is custom_client assert rm.api is not None + assert rm.async_api is not None assert rm._score_ingestion_client is not None custom_client.close() From e9a47c470b5e6e696afff7406bb2fa197038cc74 Mon Sep 17 00:00:00 2001 From: Tobias Wochinger Date: Thu, 2 Jul 2026 15:54:39 +0200 Subject: [PATCH 9/9] fix(client): delegate api access through resources --- langfuse/_client/client.py | 32 +++++++++++++++++++++++++++-- tests/unit/test_resource_manager.py | 6 ++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 8c788a5ff..6ce72d27a 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -95,6 +95,7 @@ from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache from langfuse.api import ( + AsyncLangfuseAPI, CreateChatPromptRequest, CreateChatPromptType, CreateTextPromptRequest, @@ -105,6 +106,7 @@ DatasetStatus, DeleteDatasetRunResponse, Error, + LangfuseAPI, MapValue, NotFoundError, PaginatedDatasetRuns, @@ -416,8 +418,34 @@ def __init__( if self._tracing_enabled and self._resources.tracer is not None else otel_trace_api.NoOpTracer() ) - self.api = self._resources.api - self.async_api = self._resources.async_api + + @property + def api(self) -> LangfuseAPI: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + return self._resources.api + + @api.setter + def api(self, value: LangfuseAPI) -> None: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + self._resources.api = value + + @property + def async_api(self) -> AsyncLangfuseAPI: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + return self._resources.async_api + + @async_api.setter + def async_api(self, value: AsyncLangfuseAPI) -> None: + if self._resources is None: + raise AttributeError("Langfuse client is not initialized") + + self._resources.async_api = value @overload def start_observation( diff --git a/tests/unit/test_resource_manager.py b/tests/unit/test_resource_manager.py index c1e91307d..f66a1e052 100644 --- a/tests/unit/test_resource_manager.py +++ b/tests/unit/test_resource_manager.py @@ -331,12 +331,18 @@ def test_at_fork_reinit_recreates_httpx_client_by_default(monkeypatch): old_api = rm.api old_async_api = rm.async_api old_score_ingestion_client = rm._score_ingestion_client + assert client.api is old_api + assert client.async_api is old_async_api rm._at_fork_reinit() assert rm.httpx_client is not old_httpx_client assert rm.api is not old_api assert rm.async_api is not old_async_api + assert client.api is rm.api + assert client.api is not old_api + assert client.async_api is rm.async_api + assert client.async_api is not old_async_api assert rm._score_ingestion_client is not old_score_ingestion_client client.shutdown()