Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ed19fe4
feat(api): update generated dataset media API
wochinge Jun 15, 2026
cd791b6
feat(datasets): support media references
wochinge Jun 15, 2026
a9a4a96
fix(datasets): address media review feedback
wochinge Jun 15, 2026
2c6718a
fix(datasets): refine media review fixes
wochinge Jun 16, 2026
592f9de
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 16, 2026
3832a4e
fix(datasets): clean up media review follow-ups
wochinge Jun 16, 2026
88905ce
fix(datasets): round-trip resolved media references
wochinge Jun 16, 2026
d7eb3ed
fix(datasets): process media inside tuples and sets
wochinge Jun 16, 2026
7db0fe7
fix(datasets): align media reference field with expectedOutput API re…
wochinge Jun 17, 2026
a2cba12
fix(datasets): drop tuple media support to avoid namedtuple breakage
wochinge Jun 17, 2026
0e2ea09
fix(media): honor per-client httpx config for media reference fetches
wochinge Jun 17, 2026
310f692
feat(datasets): scope media uploads to their dataset item
wochinge Jun 19, 2026
dcb3fc7
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 19, 2026
6c0d12c
fix(datasets): resolve dataset id once per create_dataset_item
wochinge Jun 19, 2026
d4e08f2
fix(datasets): url-encode dataset name and fix media upload job fixture
wochinge Jun 19, 2026
e40e8ea
refactor(datasets): collect media in one pass, resolve dataset id lazily
wochinge Jun 19, 2026
4e0a12d
refactor(media): rename url_is_expired to is_url_expired
wochinge Jun 22, 2026
378f767
feat(datasets): always resolve dataset item media references
wochinge Jun 22, 2026
47ca159
refactor(datasets): hand-roll media reference JSONPath, drop jsonpath-ng
wochinge Jun 22, 2026
6ad46e5
test(media): use fixed timestamps so is_url_expired ids are xdist-stable
wochinge Jun 22, 2026
5ad0c3d
refactor(datasets): namespace json_path import, drop stale -OO comment
wochinge Jun 22, 2026
b21b83a
Merge branch 'main' into feature/lfe-10289-python-sdk-changes
wochinge Jun 23, 2026
ac89d9a
chore(api): take dataset/session generated code from main
wochinge Jun 23, 2026
752a718
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LangfuseTool,
)
from ._version import __version__
from .media import LangfuseMedia, LangfuseMediaReference
from .span_filter import (
KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES,
is_default_export_span,
Expand All @@ -49,6 +50,8 @@

__all__ = [
"Langfuse",
"LangfuseMedia",
"LangfuseMediaReference",
"get_client",
"observe",
"propagate_attributes",
Expand Down
185 changes: 177 additions & 8 deletions langfuse/_client/client.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import re
import urllib.parse
import uuid
import warnings
from datetime import datetime
from hashlib import sha256
Expand All @@ -19,6 +20,7 @@
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
Expand Down Expand Up @@ -84,7 +86,7 @@
LangfuseTool,
)
from langfuse._client.utils import get_sha256_hash_hex, run_async_safely
from langfuse._utils import _get_timestamp
from langfuse._utils import _get_timestamp, json_path
from langfuse._utils.environment import get_common_release_envs
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand All @@ -94,6 +96,7 @@
CreateTextPromptRequest,
Dataset,
DatasetItem,
DatasetItemMediaReferenceField,
DatasetRunWithItems,
DatasetStatus,
DeleteDatasetRunResponse,
Expand Down Expand Up @@ -126,7 +129,7 @@
_run_task,
)
from langfuse.logger import langfuse_logger
from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference
from langfuse.model import (
ChatMessageDict,
ChatMessageWithPlaceholdersDict,
Expand Down Expand Up @@ -2326,9 +2329,9 @@ def get_dataset(
"""Fetch a dataset by its name.

Args:
name (str): The name of the dataset to fetch.
fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC).
name: The name of the dataset to fetch.
fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version: Retrieve dataset items as they existed at this specific point in time (UTC).
If provided, returns the state of items at the specified UTC timestamp.
If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC.

Expand All @@ -2339,7 +2342,7 @@ def get_dataset(
langfuse_logger.debug(f"Getting datasets {name}")
dataset = self.api.datasets.get(dataset_name=self._url_encode(name))

dataset_items = []
dataset_items: List[DatasetItem] = []
page = 1

while True:
Expand All @@ -2349,7 +2352,10 @@ def get_dataset(
limit=fetch_items_page_size,
version=version,
)
dataset_items.extend(new_items.data)
dataset_items.extend(
self._hydrate_dataset_item_media_references(item)
for item in new_items.data
)

if new_items.meta.total_pages <= page:
break
Expand Down Expand Up @@ -3355,6 +3361,45 @@ def create_dataset_item(
try:
langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}")

# Media uploads must reference the (dataset, item) they belong to, and
# the item need not exist yet — so settle on the item id up front and
# reuse it for the create call below.
item_id = id if id is not None else str(uuid.uuid4())

# Single pass per field: swap each LangfuseMedia for its reference
# string (derived from content, not the upload) and collect the media
# still to upload, deduped by media id and tagged with its field.
pending_media: Dict[str, Tuple[LangfuseMedia, str]] = {}
input = self._process_dataset_item_media(
data=input,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.INPUT.value,
)
expected_output = self._process_dataset_item_media(
data=expected_output,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value,
)
metadata = self._process_dataset_item_media(
data=metadata,
pending_media=pending_media,
field=DatasetItemMediaReferenceField.METADATA.value,
)

# The upload needs the dataset id, but the create API only takes the
# name. Resolve it once, and only when there is actually media to
# upload — a plain item pays no extra datasets.get round-trip.
if pending_media:
assert self._resources is not None
dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id
for media, field in pending_media.values():
self._resources._media_manager._upload_media_sync(
media=media,
dataset_id=dataset_id,
dataset_item_id=item_id,
field=field,
)

result = self.api.dataset_items.create(
dataset_name=dataset_name,
input=input,
Expand All @@ -3363,14 +3408,138 @@ def create_dataset_item(
source_trace_id=source_trace_id,
source_observation_id=source_observation_id,
status=status,
id=id,
id=item_id,
)

return cast(DatasetItem, result)
except Error as e:
handle_fern_exception(e)
raise e

def _process_dataset_item_media(
self,
*,
data: Any,
pending_media: Dict[str, Tuple[LangfuseMedia, str]],
field: str,
) -> Any:
"""Swap each ``LangfuseMedia`` for its reference string in ``data``.

Each replaced media is recorded in ``pending_media`` (keyed by media id,
so the same media across fields uploads once) for the caller to upload
after the dataset id has been resolved.
"""
if self._resources is None:
return data

max_levels = 10

def _process_data_recursively(
data: Any, level: int, ancestor_container_ids: set[int]
) -> Any:
if isinstance(data, LangfuseMedia):
reference_string = data._reference_string
media_id = data._media_id
if reference_string is None or media_id is None:
raise ValueError(
"Cannot create dataset item with invalid LangfuseMedia."
)
# First field a media appears in wins; later duplicates dedupe.
pending_media.setdefault(media_id, (data, field))
return reference_string

if isinstance(data, LangfuseMediaReference):
return data.reference_string if data.reference_string else data

# Tuples are intentionally excluded: namedtuple subclasses can't be
# rebuilt from an iterable, so media inside them is left untouched.
if not isinstance(data, (list, set, frozenset, dict)):
return data
Comment thread
wochinge marked this conversation as resolved.

# Container ids only protect against recursive cycles.
data_id = id(data)
if data_id in ancestor_container_ids or level > max_levels:
return data

next_ancestor_container_ids = ancestor_container_ids | {data_id}

if isinstance(data, (list, set, frozenset)):
processed = (
_process_data_recursively(
item, level + 1, next_ancestor_container_ids
)
for item in data
)
return type(data)(processed)

return {
key: _process_data_recursively(
value, level + 1, next_ancestor_container_ids
Comment thread
wochinge marked this conversation as resolved.
)
for key, value in data.items()
}

return _process_data_recursively(data, 1, set())

def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem:
media_references = item.media_references or []
if not media_references:
return item

# Map the API enum member to the snake_case model attribute so this keeps
# working regardless of the enum's wire value (e.g. "expectedOutput").
attr_by_field = {
DatasetItemMediaReferenceField.INPUT: "input",
DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output",
DatasetItemMediaReferenceField.METADATA: "metadata",
}
hydrated_fields = {
"input": item.input,
"expected_output": item.expected_output,
"metadata": item.metadata,
}

for media_reference in media_references:
media = media_reference.media
field = attr_by_field.get(media_reference.field)
if field is None:
continue

replacement = LangfuseMediaReference(
media_id=media.media_id,
content_type=media.content_type,
url=media.url,
url_expiry=media.url_expiry,
content_length=media.content_length,
reference_string=media_reference.reference_string,
)
Comment thread
claude[bot] marked this conversation as resolved.
hydrated_fields[field] = self._replace_json_path_value(
value=hydrated_fields[field],
path=media_reference.json_path,
replacement=replacement,
)

return item.model_copy(
update={
"input": hydrated_fields["input"],
"expected_output": hydrated_fields["expected_output"],
"metadata": hydrated_fields["metadata"],
}
)

def _replace_json_path_value(
self, *, value: Any, path: str, replacement: LangfuseMediaReference
) -> Any:
try:
return json_path.set_value_at_path(value, path, replacement)
except Exception as e:
langfuse_logger.warning(
f"Failed to hydrate dataset media reference at JSONPath {path}",
exc_info=e,
)

return value

def resolve_media_references(
self,
*,
Comment thread
claude[bot] marked this conversation as resolved.
Expand Down
23 changes: 23 additions & 0 deletions langfuse/_client/resource_manager.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ class LangfuseResourceManager:
_instances: Dict[str, "LangfuseResourceManager"] = {}
_lock = threading.RLock()

@classmethod
def get_singleton_httpx_client(cls) -> Optional[httpx.Client]:
with cls._lock:
instances = list(cls._instances.values())

if not instances:
return None

if len(instances) > 1:
# Mirror get_client's safety stance: with multiple clients we
# cannot tell which one produced a given reference, so fall back
# to a default httpx client rather than silently using an
# arbitrary instance's transport config (proxy / CA / mTLS).
langfuse_logger.warning(
"Multiple Langfuse clients are instantiated; falling back to a "
"default httpx client for LangfuseMediaReference fetches. Pass an "
"explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to "
"honor per-client transport settings."
)
return None

return instances[0].httpx_client
Comment thread
claude[bot] marked this conversation as resolved.

def __new__(
cls,
*,
Expand Down
43 changes: 42 additions & 1 deletion langfuse/_task_manager/media_manager.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ def _process_media(
content_sha256_hash=media._content_sha256_hash,
trace_id=trace_id,
observation_id=observation_id,
dataset_id=None,
dataset_item_id=None,
field=field,
)

Expand All @@ -284,6 +286,43 @@ def _process_media(
f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}"
)

def _upload_media_sync(
self,
*,
media: LangfuseMedia,
dataset_id: Optional[str] = None,
dataset_item_id: Optional[str] = None,
field: Optional[str] = None,
) -> None:
if not self._enabled:
raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.")

if (
media._content_length is None
or media._content_type is None
or media._content_sha256_hash is None
or media._content_bytes is None
):
raise ValueError("Cannot upload invalid LangfuseMedia.")

if media._media_id is None:
raise ValueError("Cannot upload LangfuseMedia without media ID.")

upload_media_job = UploadMediaJob(
media_id=media._media_id,
content_bytes=media._content_bytes,
content_type=media._content_type,
content_length=media._content_length,
content_sha256_hash=media._content_sha256_hash,
trace_id=None,
observation_id=None,
dataset_id=dataset_id,
dataset_item_id=dataset_item_id,
field=field,
)

self._process_upload_media_job(data=upload_media_job)

def _process_upload_media_job(
self,
*,
Expand All @@ -294,9 +333,11 @@ def _process_upload_media_job(
content_length=data["content_length"],
content_type=cast(MediaContentType, data["content_type"]),
sha256hash=data["content_sha256_hash"],
field=data["field"],
trace_id=data["trace_id"],
observation_id=data["observation_id"],
dataset_id=data["dataset_id"],
dataset_item_id=data["dataset_item_id"],
field=data["field"],
)

upload_url = upload_url_response.upload_url
Expand Down
6 changes: 4 additions & 2 deletions langfuse/_task_manager/media_upload_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class UploadMediaJob(TypedDict):
content_length: int
content_bytes: bytes
content_sha256_hash: str
trace_id: str
trace_id: Optional[str]
observation_id: Optional[str]
field: str
dataset_id: Optional[str]
dataset_item_id: Optional[str]
field: Optional[str]
Comment thread
claude[bot] marked this conversation as resolved.
Loading