Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
from langfuse.api import (
AsyncLangfuseAPI,
CreateChatPromptRequest,
CreateChatPromptType,
CreateTextPromptRequest,
Expand All @@ -105,6 +106,7 @@
DatasetStatus,
DeleteDatasetRunResponse,
Error,
LangfuseAPI,
MapValue,
NotFoundError,
PaginatedDatasetRuns,
Expand Down Expand Up @@ -176,6 +178,13 @@ class Langfuse:
host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com".
timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
**Fork safety**: ``httpx.Client`` is thread-safe but not process-safe. When using
``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK automatically
recreates its internally-managed HTTP client in child processes after fork. A custom
``httpx_client`` is intentionally left as-is (the fork-inherited copy is reused), so
you retain the opportunity to handle process-safety yourself — for example by
registering your own ``os.register_at_fork(after_in_child=...)`` handler to close and
reopen connections on the custom client.
debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
Expand Down Expand Up @@ -409,8 +418,34 @@ def __init__(
if self._tracing_enabled and self._resources.tracer is not None
else otel_trace_api.NoOpTracer()
)
self.api = self._resources.api
self.async_api = self._resources.async_api

@property
def api(self) -> LangfuseAPI:
if self._resources is None:
raise AttributeError("Langfuse client is not initialized")

return self._resources.api

@api.setter
def api(self, value: LangfuseAPI) -> None:
if self._resources is None:
raise AttributeError("Langfuse client is not initialized")

self._resources.api = value

@property
def async_api(self) -> AsyncLangfuseAPI:
if self._resources is None:
raise AttributeError("Langfuse client is not initialized")

return self._resources.async_api

@async_api.setter
def async_api(self, value: AsyncLangfuseAPI) -> None:
if self._resources is None:
raise AttributeError("Langfuse client is not initialized")

self._resources.async_api = value

@overload
def start_observation(
Expand Down
258 changes: 193 additions & 65 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

import atexit
import os
import sys
import threading
import urllib.request
import weakref
from queue import Full, Queue
from typing import Any, Callable, Dict, List, Optional, cast

Expand Down Expand Up @@ -79,6 +82,10 @@

_instances: Dict[str, "LangfuseResourceManager"] = {}
_lock = threading.RLock()
_otel_tracer: Tracer
_media_manager: MediaManager
_media_upload_consumers: List[MediaUploadConsumer]
_ingestion_consumers: List[ScoreIngestionConsumer]

@classmethod
def get_singleton_httpx_client(cls) -> Optional[httpx.Client]:
Expand Down Expand Up @@ -201,6 +208,7 @@
self.mask = mask
self.mask_otel_spans = mask_otel_spans
self.environment = environment
self._shutdown = False

# Store additional client settings for get_client() to use
self.timeout = timeout
Expand All @@ -216,60 +224,19 @@
self.span_exporter = span_exporter
self.tracer_provider: Optional[TracerProvider] = None

# API Clients

## API clients must be singletons because the underlying HTTPX clients
## use connection pools with limited capacity. Creating multiple instances
## could exhaust the OS's maximum number of available TCP sockets (file descriptors),
## leading to connection errors.
if httpx_client is not None:
self.httpx_client = httpx_client
else:
# Create a new httpx client with additional_headers if provided
client_headers = additional_headers if additional_headers else {}
self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers)

self.api = LangfuseAPI(
base_url=base_url,
username=self.public_key,
password=secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
httpx_client=self.httpx_client,
timeout=timeout,
)
self.async_api = AsyncLangfuseAPI(
base_url=base_url,
username=self.public_key,
password=secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
timeout=timeout,
)
score_ingestion_client = LangfuseClient(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
version=langfuse_version,
timeout=timeout or 20,
session=self.httpx_client,
)
self._custom_httpx_client = httpx_client
self._init_api_clients()

# Media
self._media_upload_enabled = os.environ.get(
LANGFUSE_MEDIA_UPLOAD_ENABLED, "True"
).lower() not in ("false", "0")

self._media_upload_queue: Queue[Any] = Queue(100_000)
self._media_manager = MediaManager(
api_client=self.api,
httpx_client=self.httpx_client,
media_upload_queue=self._media_upload_queue,
max_retries=3,
self._media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)
self._media_upload_consumers = []

self._init_media_manager()

# OTEL Tracer
if tracing_enabled:
Expand Down Expand Up @@ -303,48 +270,207 @@
attributes={"public_key": self.public_key},
)

media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
self._init_consumer_threads()

# Prompt cache
self.prompt_cache = PromptCache()

# Register shutdown handler
atexit.register(self.shutdown)

# Register fork handler to reinitialize consumer threads in child process.
# When using Gunicorn with --preload, os.fork() copies memory but not threads
# (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
# Without this, media upload and score ingestion threads are lost after fork,
# causing silent data loss.
#
# Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety
# for span export via its own os.register_at_fork. This handler covers the
# remaining background threads managed by LangfuseResourceManager.
#
# weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong
# reference to this instance, which would block garbage collection.
# See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
# Walrus operator resolves the weak reference once and stores it in
# a temporary variable before calling it. This avoids a TOCTOU window
# where GC could collect the referent between checking for None and
# invoking the method.
after_in_child=lambda: (m := weak_reinit()) and m()
)

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={self._media_upload_thread_count}"
)

def _init_media_manager(self) -> None:
"""Initialize or reset media upload state while preserving manager references."""
self._media_upload_queue: Queue[Any] = Queue(100_000)
if hasattr(self, "_media_manager"):
self._media_manager.reinitialize(
api_client=self.api,
httpx_client=self.httpx_client,
media_upload_queue=self._media_upload_queue,
)
else:
self._media_manager = MediaManager(
api_client=self.api,
httpx_client=self.httpx_client,
media_upload_queue=self._media_upload_queue,
max_retries=3,
)

self._media_upload_consumers = []

def _init_api_clients(self) -> None:
"""Initialize HTTP-backed API clients.

Internally-managed httpx clients are recreated when this method is
called after fork. Caller-provided clients are preserved because their
lifecycle belongs to the caller.
"""
if self._custom_httpx_client is not None:
self.httpx_client = self._custom_httpx_client
else:
client_headers = self.additional_headers if self.additional_headers else {}
self.httpx_client = httpx.Client(
timeout=self.timeout, headers=client_headers
)

self.api = LangfuseAPI(
base_url=self.base_url,
username=self.public_key,
password=self.secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
httpx_client=self.httpx_client,
timeout=self.timeout,
)
self.async_api = AsyncLangfuseAPI(
base_url=self.base_url,
username=self.public_key,
password=self.secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
timeout=self.timeout,
)
Comment thread
claude[bot] marked this conversation as resolved.
self._score_ingestion_client = LangfuseClient(
public_key=self.public_key,
secret_key=self.secret_key,
base_url=self.base_url,
version=langfuse_version,
timeout=self.timeout or 20,
session=self.httpx_client,
)

def _init_consumer_threads(self) -> None:
"""Initialize media upload and score ingestion consumer threads."""
if self._media_upload_enabled:
for i in range(media_upload_thread_count):
for i in range(self._media_upload_thread_count):
media_upload_consumer = MediaUploadConsumer(
identifier=i,
media_manager=self._media_manager,
)
media_upload_consumer.start()
self._media_upload_consumers.append(media_upload_consumer)

# Prompt cache
self.prompt_cache = PromptCache()

# Score ingestion
self._score_ingestion_queue: Queue[Any] = Queue(100_000)
self._ingestion_consumers = []

ingestion_consumer = ScoreIngestionConsumer(
ingestion_queue=self._score_ingestion_queue,
identifier=0,
client=score_ingestion_client,
flush_at=flush_at,
flush_interval=flush_interval,
client=self._score_ingestion_client,
flush_at=self.flush_at,
flush_interval=self.flush_interval,
max_retries=3,
public_key=self.public_key,
)
ingestion_consumer.start()
self._ingestion_consumers.append(ingestion_consumer)

Check warning on line 400 in langfuse/_client/resource_manager.py

View check run for this annotation

Claude / Claude Code Review

Score queue not reset before media consumer starts in _init_consumer_threads

In `_init_consumer_threads` (resource_manager.py:375-400), the media consumer `.start()` loop runs before `self._score_ingestion_queue` and `self._ingestion_consumers` are reset. If a `media_upload_consumer.start()` raises (e.g. `OSError: can't start new thread`), the outer try/except at line 462-470 swallows the exception and the child keeps running with the parent's fork-inherited score queue — so a later `flush()` → `_score_ingestion_queue.join()` blocks forever, the exact Gunicorn worker tim
Comment thread
wochinge marked this conversation as resolved.

# Register shutdown handler
atexit.register(self.shutdown)
def _at_fork_reinit(self) -> None:
Comment thread
wochinge marked this conversation as resolved.
"""Reinitialize consumer threads after fork in child process.

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={media_upload_thread_count or 1}"
Called automatically via os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
media upload and score ingestion events are silently lost.

Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export
fork-safety separately via its own os.register_at_fork handler.

Skipped if shutdown() was already called on this instance, to avoid
restarting threads on an intentionally torn-down manager.
"""
# The class-level lock may have been held by a thread in the parent at fork time.
# That thread does not exist in the child, so the lock can never be released and
# any attempt to acquire it would deadlock. Replace it before the shutdown check:
# the lock is class-level state needed by the child (e.g. to create a new client)
# even if this particular instance was already shut down.
LangfuseResourceManager._lock = threading.RLock()

if self._shutdown:
return

if sys.platform == "darwin" and not urllib.request.getproxies_environment():
# urllib proxy discovery falls back to macOS SystemConfiguration APIs that
# are not safe to invoke after fork(). Setting no_proxy="*" makes httpx and
# requests skip that lookup entirely in this child process. Skipped when
# proxies are configured via environment variables: urllib then never touches
# SystemConfiguration (no segfault risk), and overriding no_proxy would
# disable the user's proxy setup process-wide.
os.environ["no_proxy"] = "*"
os.environ["NO_PROXY"] = "*"

langfuse_logger.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads."
)

# Queues are intentionally recreated after fork. Items enqueued before fork
# belong to the preloaded parent process and must not be processed by every
# worker — otherwise uploads/scores would be duplicated across workers.
#
# Internally-managed httpx clients must also be recreated: fork() duplicates the
# parent's connection pool (TCP socket file descriptors) into the child. Both
# processes then share the same underlying sockets, causing data corruption and
# SSL/TLS state mismatch under concurrent use. Fresh clients start with an empty
# pool owned solely by this child process.
#
# Custom httpx clients provided by the caller are NOT recreated. The fork-inherited
# copy is reused as-is, giving the caller the opportunity to handle process-safety
# themselves (e.g. by registering their own os.register_at_fork handler).
try:
self._init_api_clients()
except Exception as e:
langfuse_logger.error(
f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. "
f"Network requests may fail in this worker."
)

try:
self._init_media_manager()
self._init_consumer_threads()
self.prompt_cache = PromptCache()
except Exception as e:
langfuse_logger.error(
f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. "
f"Media upload, score ingestion, and prompt cache refresh will be unavailable in this worker."
)

langfuse_logger.debug(
f"[PID {os.getpid()}] Langfuse consumer threads and prompt cache reinitialized after fork"
)

@classmethod
Expand Down Expand Up @@ -486,6 +612,8 @@
langfuse_logger.debug("Successfully flushed media upload queue")

def shutdown(self) -> None:
self._shutdown = True

# Unregister the atexit handler first
atexit.unregister(self.shutdown)

Expand Down
Loading
Loading