diff --git a/langfuse/__init__.py b/langfuse/__init__.py index ec441e745..d4d24cd04 100644 --- a/langfuse/__init__.py +++ b/langfuse/__init__.py @@ -29,6 +29,7 @@ LangfuseTool, ) from ._version import __version__ +from .media import LangfuseMedia, LangfuseMediaReference from .span_filter import ( KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES, is_default_export_span, @@ -49,6 +50,8 @@ __all__ = [ "Langfuse", + "LangfuseMedia", + "LangfuseMediaReference", "get_client", "observe", "propagate_attributes", diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 8e70e03b1..00f7d803a 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -8,6 +8,7 @@ import os import re import urllib.parse +import uuid import warnings from datetime import datetime from hashlib import sha256 @@ -19,6 +20,7 @@ List, Literal, Optional, + Tuple, Type, Union, cast, @@ -84,7 +86,7 @@ LangfuseTool, ) from langfuse._client.utils import get_sha256_hash_hex, run_async_safely -from langfuse._utils import _get_timestamp +from langfuse._utils import _get_timestamp, json_path from langfuse._utils.environment import get_common_release_envs from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -94,6 +96,7 @@ CreateTextPromptRequest, Dataset, DatasetItem, + DatasetItemMediaReferenceField, DatasetRunWithItems, DatasetStatus, DeleteDatasetRunResponse, @@ -126,7 +129,7 @@ _run_task, ) from langfuse.logger import langfuse_logger -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference from langfuse.model import ( ChatMessageDict, ChatMessageWithPlaceholdersDict, @@ -2326,9 +2329,9 @@ def get_dataset( """Fetch a dataset by its name. Args: - name (str): The name of the dataset to fetch. - fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50. - version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC). + name: The name of the dataset to fetch. + fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50. + version: Retrieve dataset items as they existed at this specific point in time (UTC). If provided, returns the state of items at the specified UTC timestamp. If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC. @@ -2339,7 +2342,7 @@ def get_dataset( langfuse_logger.debug(f"Getting datasets {name}") dataset = self.api.datasets.get(dataset_name=self._url_encode(name)) - dataset_items = [] + dataset_items: List[DatasetItem] = [] page = 1 while True: @@ -2349,7 +2352,10 @@ def get_dataset( limit=fetch_items_page_size, version=version, ) - dataset_items.extend(new_items.data) + dataset_items.extend( + self._hydrate_dataset_item_media_references(item) + for item in new_items.data + ) if new_items.meta.total_pages <= page: break @@ -3355,6 +3361,45 @@ def create_dataset_item( try: langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}") + # Media uploads must reference the (dataset, item) they belong to, and + # the item need not exist yet — so settle on the item id up front and + # reuse it for the create call below. + item_id = id if id is not None else str(uuid.uuid4()) + + # Single pass per field: swap each LangfuseMedia for its reference + # string (derived from content, not the upload) and collect the media + # still to upload, deduped by media id and tagged with its field. + pending_media: Dict[str, Tuple[LangfuseMedia, str]] = {} + input = self._process_dataset_item_media( + data=input, + pending_media=pending_media, + field=DatasetItemMediaReferenceField.INPUT.value, + ) + expected_output = self._process_dataset_item_media( + data=expected_output, + pending_media=pending_media, + field=DatasetItemMediaReferenceField.EXPECTED_OUTPUT.value, + ) + metadata = self._process_dataset_item_media( + data=metadata, + pending_media=pending_media, + field=DatasetItemMediaReferenceField.METADATA.value, + ) + + # The upload needs the dataset id, but the create API only takes the + # name. Resolve it once, and only when there is actually media to + # upload — a plain item pays no extra datasets.get round-trip. + if pending_media: + assert self._resources is not None + dataset_id = self.api.datasets.get(self._url_encode(dataset_name)).id + for media, field in pending_media.values(): + self._resources._media_manager._upload_media_sync( + media=media, + dataset_id=dataset_id, + dataset_item_id=item_id, + field=field, + ) + result = self.api.dataset_items.create( dataset_name=dataset_name, input=input, @@ -3363,7 +3408,7 @@ def create_dataset_item( source_trace_id=source_trace_id, source_observation_id=source_observation_id, status=status, - id=id, + id=item_id, ) return cast(DatasetItem, result) @@ -3371,6 +3416,130 @@ def create_dataset_item( handle_fern_exception(e) raise e + def _process_dataset_item_media( + self, + *, + data: Any, + pending_media: Dict[str, Tuple[LangfuseMedia, str]], + field: str, + ) -> Any: + """Swap each ``LangfuseMedia`` for its reference string in ``data``. + + Each replaced media is recorded in ``pending_media`` (keyed by media id, + so the same media across fields uploads once) for the caller to upload + after the dataset id has been resolved. + """ + if self._resources is None: + return data + + max_levels = 10 + + def _process_data_recursively( + data: Any, level: int, ancestor_container_ids: set[int] + ) -> Any: + if isinstance(data, LangfuseMedia): + reference_string = data._reference_string + media_id = data._media_id + if reference_string is None or media_id is None: + raise ValueError( + "Cannot create dataset item with invalid LangfuseMedia." + ) + # First field a media appears in wins; later duplicates dedupe. + pending_media.setdefault(media_id, (data, field)) + return reference_string + + if isinstance(data, LangfuseMediaReference): + return data.reference_string if data.reference_string else data + + # Tuples are intentionally excluded: namedtuple subclasses can't be + # rebuilt from an iterable, so media inside them is left untouched. + if not isinstance(data, (list, set, frozenset, dict)): + return data + + # Container ids only protect against recursive cycles. + data_id = id(data) + if data_id in ancestor_container_ids or level > max_levels: + return data + + next_ancestor_container_ids = ancestor_container_ids | {data_id} + + if isinstance(data, (list, set, frozenset)): + processed = ( + _process_data_recursively( + item, level + 1, next_ancestor_container_ids + ) + for item in data + ) + return type(data)(processed) + + return { + key: _process_data_recursively( + value, level + 1, next_ancestor_container_ids + ) + for key, value in data.items() + } + + return _process_data_recursively(data, 1, set()) + + def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem: + media_references = item.media_references or [] + if not media_references: + return item + + # Map the API enum member to the snake_case model attribute so this keeps + # working regardless of the enum's wire value (e.g. "expectedOutput"). + attr_by_field = { + DatasetItemMediaReferenceField.INPUT: "input", + DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output", + DatasetItemMediaReferenceField.METADATA: "metadata", + } + hydrated_fields = { + "input": item.input, + "expected_output": item.expected_output, + "metadata": item.metadata, + } + + for media_reference in media_references: + media = media_reference.media + field = attr_by_field.get(media_reference.field) + if field is None: + continue + + replacement = LangfuseMediaReference( + media_id=media.media_id, + content_type=media.content_type, + url=media.url, + url_expiry=media.url_expiry, + content_length=media.content_length, + reference_string=media_reference.reference_string, + ) + hydrated_fields[field] = self._replace_json_path_value( + value=hydrated_fields[field], + path=media_reference.json_path, + replacement=replacement, + ) + + return item.model_copy( + update={ + "input": hydrated_fields["input"], + "expected_output": hydrated_fields["expected_output"], + "metadata": hydrated_fields["metadata"], + } + ) + + def _replace_json_path_value( + self, *, value: Any, path: str, replacement: LangfuseMediaReference + ) -> Any: + try: + return json_path.set_value_at_path(value, path, replacement) + except Exception as e: + langfuse_logger.warning( + f"Failed to hydrate dataset media reference at JSONPath {path}", + exc_info=e, + ) + + return value + def resolve_media_references( self, *, diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 004566c8f..ab8416dcb 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -79,6 +79,29 @@ class LangfuseResourceManager: _instances: Dict[str, "LangfuseResourceManager"] = {} _lock = threading.RLock() + @classmethod + def get_singleton_httpx_client(cls) -> Optional[httpx.Client]: + with cls._lock: + instances = list(cls._instances.values()) + + if not instances: + return None + + if len(instances) > 1: + # Mirror get_client's safety stance: with multiple clients we + # cannot tell which one produced a given reference, so fall back + # to a default httpx client rather than silently using an + # arbitrary instance's transport config (proxy / CA / mTLS). + langfuse_logger.warning( + "Multiple Langfuse clients are instantiated; falling back to a " + "default httpx client for LangfuseMediaReference fetches. Pass an " + "explicit `client` to fetch_bytes/fetch_base64/fetch_data_uri to " + "honor per-client transport settings." + ) + return None + + return instances[0].httpx_client + def __new__( cls, *, diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index f3bd7b9cb..e1882fb4c 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -263,6 +263,8 @@ def _process_media( content_sha256_hash=media._content_sha256_hash, trace_id=trace_id, observation_id=observation_id, + dataset_id=None, + dataset_item_id=None, field=field, ) @@ -284,6 +286,43 @@ def _process_media( f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}" ) + def _upload_media_sync( + self, + *, + media: LangfuseMedia, + dataset_id: Optional[str] = None, + dataset_item_id: Optional[str] = None, + field: Optional[str] = None, + ) -> None: + if not self._enabled: + raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.") + + if ( + media._content_length is None + or media._content_type is None + or media._content_sha256_hash is None + or media._content_bytes is None + ): + raise ValueError("Cannot upload invalid LangfuseMedia.") + + if media._media_id is None: + raise ValueError("Cannot upload LangfuseMedia without media ID.") + + upload_media_job = UploadMediaJob( + media_id=media._media_id, + content_bytes=media._content_bytes, + content_type=media._content_type, + content_length=media._content_length, + content_sha256_hash=media._content_sha256_hash, + trace_id=None, + observation_id=None, + dataset_id=dataset_id, + dataset_item_id=dataset_item_id, + field=field, + ) + + self._process_upload_media_job(data=upload_media_job) + def _process_upload_media_job( self, *, @@ -294,9 +333,11 @@ def _process_upload_media_job( content_length=data["content_length"], content_type=cast(MediaContentType, data["content_type"]), sha256hash=data["content_sha256_hash"], - field=data["field"], trace_id=data["trace_id"], observation_id=data["observation_id"], + dataset_id=data["dataset_id"], + dataset_item_id=data["dataset_item_id"], + field=data["field"], ) upload_url = upload_url_response.upload_url diff --git a/langfuse/_task_manager/media_upload_queue.py b/langfuse/_task_manager/media_upload_queue.py index e4cd8ebee..a0ef4bdf1 100644 --- a/langfuse/_task_manager/media_upload_queue.py +++ b/langfuse/_task_manager/media_upload_queue.py @@ -7,6 +7,8 @@ class UploadMediaJob(TypedDict): content_length: int content_bytes: bytes content_sha256_hash: str - trace_id: str + trace_id: Optional[str] observation_id: Optional[str] - field: str + dataset_id: Optional[str] + dataset_item_id: Optional[str] + field: Optional[str] diff --git a/langfuse/_utils/json_path.py b/langfuse/_utils/json_path.py new file mode 100644 index 000000000..8a8beba87 --- /dev/null +++ b/langfuse/_utils/json_path.py @@ -0,0 +1,74 @@ +"""Resolve the JSONPaths the Langfuse API attaches to dataset item media references. + +The backend's ``findMediaReferences`` reads ``node.path`` from jsonpath-plus' +``JSONPath(..., resultType="all")``, which returns a bracket-normalized path. So we +only ever see ``$``, ``['']`` (single-quoted, with no escaping — keys may +contain literal quotes, brackets, dots, etc.), and ``[]`` (also emitted for +all-digit object keys). It is always bracket notation — never dot notation like +``$.x.y`` — i.e. an RFC 9535 normalized path. We parse exactly this restricted +grammar rather than depend on a full JSONPath engine. ``findMediaReferences`` is the +format of record; anything outside the grammar raises. +""" + +from typing import Any, List, Union + + +def parse_path(json_path: str) -> List[Union[str, int]]: + """Parse a jsonpath-plus normalized path into ordered segments. + + Object keys become ``str`` segments, array indices become ``int`` segments. + Returns an empty list for the root ``$``. + """ + if not json_path.startswith("$"): + raise ValueError(json_path) + + segments: List[Union[str, int]] = [] + i, n = 1, len(json_path) + while i < n: + if json_path[i] != "[": + raise ValueError(json_path) + i += 1 + + if i < n and json_path[i] == "'": # object key: [''] (single-quoted) + i += 1 + # No escaping, so the key ends at the closing "']". + close = json_path.find("']", i) + if close == -1: + raise ValueError(json_path) + segments.append(json_path[i:close]) + i = close + 2 + else: # array index: [] + start = i + while i < n and json_path[i].isdigit(): + i += 1 + if i == start or i >= n or json_path[i] != "]": + raise ValueError(json_path) + segments.append(int(json_path[start:i])) + i += 1 + + return segments + + +def set_value_at_path(value: Any, json_path: str, replacement: Any) -> Any: + """Replace the node at ``json_path`` within ``value`` with ``replacement``. + + Mutates ``value`` in place and returns it; for the root path ``$`` it returns + ``replacement`` directly. Raises if the path can't be parsed or navigated. + """ + segments = parse_path(json_path) + if not segments: # "$": the whole value is the reference + return replacement + + target = value + for segment in segments[:-1]: + target = target[segment] + + leaf = segments[-1] + # JSON object keys are always strings, so an int leaf on a dict is an + # all-digit key that jsonpath-plus rendered ambiguously as "[0]". We can't + # tell it from a list index, so raise rather than add a bogus int key. + if isinstance(leaf, int) and isinstance(target, dict): + raise KeyError(json_path) + target[leaf] = replacement + + return value diff --git a/langfuse/_utils/serializer.py b/langfuse/_utils/serializer.py index 27294bf80..135d1f625 100644 --- a/langfuse/_utils/serializer.py +++ b/langfuse/_utils/serializer.py @@ -15,7 +15,7 @@ from pydantic import BaseModel -from langfuse.media import LangfuseMedia +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Attempt to import Serializable try: @@ -62,6 +62,12 @@ def _default_inner(self, obj: Any) -> Any: or f"" ) + if ( + isinstance(obj, LangfuseMediaReference) + and obj.reference_string is not None + ): + return obj.reference_string + # Check if numpy is available and if the object is a numpy scalar # If so, convert it to a Python scalar using the item() method if np is not None and isinstance(obj, np.generic): diff --git a/langfuse/media.py b/langfuse/media.py index 53940382c..80e5bcd6a 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -4,6 +4,8 @@ import hashlib import os import re +from dataclasses import dataclass +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, TypeVar, cast import httpx @@ -18,6 +20,81 @@ T = TypeVar("T") +@dataclass(frozen=True) +class LangfuseMediaReference: + """Resolved reference to media stored in Langfuse.""" + + media_id: str + content_type: str + url: str + url_expiry: Optional[str] = None + content_length: Optional[int] = None + reference_string: Optional[str] = None + + def is_url_expired(self) -> bool: + """Return whether the signed URL is already expired.""" + if self.url_expiry is None: + return False + + expiry = self.url_expiry.replace("Z", "+00:00") + + try: + expiry_datetime = datetime.fromisoformat(expiry) + except ValueError: + return False + + if expiry_datetime.tzinfo is None: + expiry_datetime = expiry_datetime.replace(tzinfo=timezone.utc) + + return expiry_datetime <= datetime.now(timezone.utc) + + def fetch_bytes( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> bytes: + """Fetch the media content from the signed URL. + + Args: + timeout: Request timeout in seconds. + client: Optional httpx client to use for the request. Pass this to + honor custom transport settings (proxy, CA bundle, mTLS) — in + particular when multiple Langfuse clients are configured, since + the SDK cannot otherwise tell which client produced this + reference. When omitted, the single configured client is used, + falling back to a default httpx client. + """ + from langfuse._client.resource_manager import LangfuseResourceManager + + httpx_client = client or LangfuseResourceManager.get_singleton_httpx_client() + response = ( + httpx_client.get(self.url, timeout=timeout) + if httpx_client is not None + else httpx.get(self.url, timeout=timeout) + ) + response.raise_for_status() + + return response.content + + def fetch_base64( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return raw base64 without a data URI prefix. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return base64.b64encode( + self.fetch_bytes(timeout=timeout, client=client) + ).decode() + + def fetch_data_uri( + self, *, timeout: float = 30.0, client: Optional[httpx.Client] = None + ) -> str: + """Fetch media and return it as a data URI. + + See :meth:`fetch_bytes` for the ``client`` argument. + """ + return f"data:{self.content_type};base64,{self.fetch_base64(timeout=timeout, client=client)}" + + class LangfuseMedia: """A class for wrapping media objects for upload to Langfuse. diff --git a/tests/e2e/test_datasets.py b/tests/e2e/test_datasets.py index 8d575180a..8ef61420c 100644 --- a/tests/e2e/test_datasets.py +++ b/tests/e2e/test_datasets.py @@ -3,6 +3,7 @@ from langfuse import Langfuse from langfuse.api import DatasetStatus +from langfuse.media import LangfuseMedia, LangfuseMediaReference from tests.support.utils import create_uuid, wait_for_result @@ -69,6 +70,71 @@ def test_create_dataset_item(): assert dataset.items[0].dataset_name == name +def test_create_and_get_dataset_item_with_media(): + langfuse = Langfuse(debug=False) + name = create_uuid() + langfuse.create_dataset(name=name) + + def media(tag: str) -> LangfuseMedia: + # Distinct bytes -> distinct media id, so each path can be verified to + # resolve to its own media via fetch_bytes. + return LangfuseMedia( + content_bytes=f"media-{tag}".encode(), content_type="image/png" + ) + + image, gallery0, gallery1, matrix, reference, thumbnail = ( + media("image"), + media("gallery0"), + media("gallery1"), + media("matrix"), + media("reference"), + media("thumbnail"), + ) + + # Cover the interesting jsonpath-plus path shapes in one item: a plain key, + # list indices, consecutive indices (nested list), plus expectedOutput and + # metadata fields. + created_item = langfuse.create_dataset_item( + dataset_name=name, + input={ + "question": "compare the images", + "image": image, # $['image'] + "gallery": [gallery0, gallery1], # $['gallery'][0], $['gallery'][1] + "matrix": [[matrix]], # $['matrix'][0][0] + }, + expected_output={"reference": reference}, # $['reference'] + metadata={"thumbnail": thumbnail}, # $['thumbnail'] + ) + + assert created_item.input["image"].startswith("@@@langfuseMedia:") + assert created_item.input["gallery"][0].startswith("@@@langfuseMedia:") + + resolved_dataset = wait_for_result( + lambda: langfuse.get_dataset(name), + is_result_ready=lambda dataset: ( + bool(dataset.items) + and isinstance(dataset.items[0].input["image"], LangfuseMediaReference) + ), + ) + resolved_item = resolved_dataset.items[0] + + resolved_by_path = { + "image": (resolved_item.input["image"], image), + "gallery[0]": (resolved_item.input["gallery"][0], gallery0), + "gallery[1]": (resolved_item.input["gallery"][1], gallery1), + "matrix[0][0]": (resolved_item.input["matrix"][0][0], matrix), + "reference": (resolved_item.expected_output["reference"], reference), + "thumbnail": (resolved_item.metadata["thumbnail"], thumbnail), + } + for path, (resolved, original) in resolved_by_path.items(): + assert isinstance(resolved, LangfuseMediaReference), path + # The reference at each path resolves to that path's own media. + assert resolved.fetch_bytes() == original._content_bytes, path + + # Non-media fields are left untouched. + assert resolved_item.input["question"] == "compare the images" + + def test_get_all_items(): langfuse = Langfuse(debug=False) name = create_uuid() diff --git a/tests/unit/test_datasets.py b/tests/unit/test_datasets.py new file mode 100644 index 000000000..f1ca3fca3 --- /dev/null +++ b/tests/unit/test_datasets.py @@ -0,0 +1,274 @@ +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import Mock + +import pytest + +from langfuse._client.client import Langfuse +from langfuse.api import ( + DatasetItem, + DatasetItemMediaReference, + DatasetItemMediaReferenceField, + DatasetItemMediaReferenceMedia, + DatasetStatus, +) +from langfuse.media import LangfuseMedia, LangfuseMediaReference + + +@pytest.mark.parametrize( + ("field", "field_value", "json_path", "assert_resolved"), + [ + ( + DatasetItemMediaReferenceField.INPUT, + "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@", + "$", + lambda item: isinstance(item.input, LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.INPUT, + {"image": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@"}, + "$['image']", + lambda item: isinstance(item.input["image"], LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.EXPECTED_OUTPUT, + ["@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@"], + "$[0]", + lambda item: isinstance(item.expected_output[0], LangfuseMediaReference), + ), + ( + DatasetItemMediaReferenceField.METADATA, + { + "image'key": "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + }, + # jsonpath-plus does not escape the quote: key image'key -> $['image'key']. + "$['image'key']", + lambda item: isinstance(item.metadata["image'key"], LangfuseMediaReference), + ), + ], +) +def test_hydrate_dataset_item_media_references_supports_json_path_cases( + field, + field_value, + json_path, + assert_resolved, +): + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input=field_value if field == DatasetItemMediaReferenceField.INPUT else None, + expected_output=field_value + if field == DatasetItemMediaReferenceField.EXPECTED_OUTPUT + else None, + metadata=field_value + if field == DatasetItemMediaReferenceField.METADATA + else None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=field, + reference_string=reference_string, + json_path=json_path, + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + + hydrated = client._hydrate_dataset_item_media_references(item) + + assert assert_resolved(hydrated) + + +def test_create_dataset_item_processes_media_before_api_call(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + root_media = LangfuseMedia(content_bytes=b"root", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + input_data = {"image": media} + metadata = {"items": [media], "keep": "value"} + + result = client.create_dataset_item( + dataset_name="dataset", + input=input_data, + expected_output=root_media, + metadata=metadata, + id="item-id", + ) + + assert result == "created-item" + assert input_data == {"image": media} + assert metadata == {"items": [media], "keep": "value"} + # Each upload carries the dataset id (resolved from the name) plus the item + # id and the field the media lives in. + media_manager._upload_media_sync.assert_any_call( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) + media_manager._upload_media_sync.assert_any_call( + media=root_media, + dataset_id="dataset-id", + dataset_item_id="item-id", + field="expectedOutput", + ) + assert media_manager._upload_media_sync.call_count == 2 + # The dataset id is invariant for the call, so it is resolved exactly once + # regardless of how many distinct media the item carries. + datasets_api.get.assert_called_once_with("dataset") + dataset_items_api.create.assert_called_once_with( + dataset_name="dataset", + input={"image": media._reference_string}, + expected_output=root_media._reference_string, + metadata={"items": [media._reference_string], "keep": "value"}, + source_trace_id=None, + source_observation_id=None, + status=None, + id="item-id", + ) + + +def test_create_dataset_item_roundtrips_resolved_media_reference(): + # get_dataset hydrates media reference strings into LangfuseMediaReference + # instances. Feeding such an item back into + # create_dataset_item must re-emit the original reference string, otherwise + # the dataclass is serialized as an opaque dict and the media is orphaned. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + item = DatasetItem( + id="item-id", + status=DatasetStatus.ACTIVE, + input={"image": reference_string}, + expected_output=None, + metadata=None, + dataset_id="dataset-id", + dataset_name="dataset-name", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + media_references=[ + DatasetItemMediaReference( + field=DatasetItemMediaReferenceField.INPUT, + reference_string=reference_string, + json_path="$['image']", + media=DatasetItemMediaReferenceMedia( + media_id="media-id", + content_type="image/png", + content_length=7, + url="https://example.com/image.png", + url_expiry="2026-06-15T12:00:00.000Z", + ), + ) + ], + ) + + client = object.__new__(Langfuse) + hydrated = client._hydrate_dataset_item_media_references(item) + assert isinstance(hydrated.input["image"], LangfuseMediaReference) + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + + client.create_dataset_item(dataset_name="dataset", input=hydrated.input) + + assert dataset_items_api.create.call_args.kwargs["input"] == { + "image": reference_string + } + media_manager._upload_media_sync.assert_not_called() + + +def test_create_dataset_item_processes_shared_media_subtrees(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + shared = {"image": media} + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"a": shared, "b": shared}, + id="item-id", + ) + + assert shared == {"image": media} + media_manager._upload_media_sync.assert_called_once_with( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "a": {"image": media._reference_string}, + "b": {"image": media._reference_string}, + } + + +def test_create_dataset_item_processes_media_in_sets(): + media = LangfuseMedia(content_bytes=b"payload", content_type="image/png") + + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + datasets_api.get.return_value = SimpleNamespace(id="dataset-id") + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + + client.create_dataset_item( + dataset_name="dataset", input={"images": {media}}, id="item-id" + ) + + media_manager._upload_media_sync.assert_called_once_with( + media=media, dataset_id="dataset-id", dataset_item_id="item-id", field="input" + ) + assert dataset_items_api.create.call_args.kwargs["input"] == { + "images": {media._reference_string} + } + + +def test_create_dataset_item_skips_dataset_lookup_without_media(): + media_manager = Mock() + dataset_items_api = Mock() + dataset_items_api.create.return_value = "created-item" + datasets_api = Mock() + + client = object.__new__(Langfuse) + client._resources = SimpleNamespace(_media_manager=media_manager) + client.api = SimpleNamespace(dataset_items=dataset_items_api, datasets=datasets_api) + + client.create_dataset_item( + dataset_name="dataset", + input={"question": "no media here"}, + expected_output="plain text", + metadata={"k": "v"}, + ) + + # No media to upload, so the dataset id is never looked up. + datasets_api.get.assert_not_called() + media_manager._upload_media_sync.assert_not_called() diff --git a/tests/unit/test_json_path.py b/tests/unit/test_json_path.py new file mode 100644 index 000000000..676bde096 --- /dev/null +++ b/tests/unit/test_json_path.py @@ -0,0 +1,97 @@ +import pytest + +from langfuse._utils.json_path import parse_path, set_value_at_path + +_REF = "@@@langfuseMedia:type=image/png|id=abc|source=bytes@@@" + +# (description, value, json_path, expected) generated from jsonpath-plus 10.x — the +# same library the backend uses to produce the jsonPath. `json_path` is what +# jsonpath-plus emits for the reference's location; `expected` is the value after +# jsonpath-plus sets a sentinel there, so this verifies our setter matches the +# library byte-for-byte. +_JSONPATH_PLUS_CASES = [ + ("root", _REF, "$", "__MEDIA__"), + ("simple key", {"image": _REF}, "$['image']", {"image": "__MEDIA__"}), + ("key with space", {"my image": _REF}, "$['my image']", {"my image": "__MEDIA__"}), + ("apostrophe key", {"O'connor": _REF}, "$['O'connor']", {"O'connor": "__MEDIA__"}), + ("double-quote key", {'a"b': _REF}, "$['a\"b']", {'a"b': "__MEDIA__"}), + ("bracket key", {"arr[0]": _REF}, "$['arr[0]']", {"arr[0]": "__MEDIA__"}), + ("dot key", {"a.b": _REF}, "$['a.b']", {"a.b": "__MEDIA__"}), + ("list root", [_REF], "$[0]", ["__MEDIA__"]), + ( + "list element", + {"items": [0, _REF, 2]}, + "$['items'][1]", + {"items": [0, "__MEDIA__", 2]}, + ), + ( + "nested obj", + {"a": {"b": {"c": _REF}}}, + "$['a']['b']['c']", + {"a": {"b": {"c": "__MEDIA__"}}}, + ), + ("obj in list", [{"x": _REF}], "$[0]['x']", [{"x": "__MEDIA__"}]), + ("two indices", [[_REF]], "$[0][0]", [["__MEDIA__"]]), + ("three indices", [[[_REF]]], "$[0][0][0]", [[["__MEDIA__"]]]), + ( + "key then two indices", + {"matrix": [[0, _REF]]}, + "$['matrix'][0][1]", + {"matrix": [[0, "__MEDIA__"]]}, + ), + ( + "index key index", + [{"rows": [_REF]}], + "$[0]['rows'][0]", + [{"rows": ["__MEDIA__"]}], + ), + ( + "deep mixed", + {"messages": [{"content": [{"image_url": _REF}]}]}, + "$['messages'][0]['content'][0]['image_url']", + {"messages": [{"content": [{"image_url": "__MEDIA__"}]}]}, + ), + ( + "sibling untouched", + {"keep": "txt", "img": _REF}, + "$['img']", + {"keep": "txt", "img": "__MEDIA__"}, + ), +] + + +@pytest.mark.parametrize( + ("value", "json_path", "expected"), + [(c[1], c[2], c[3]) for c in _JSONPATH_PLUS_CASES], + ids=[c[0] for c in _JSONPATH_PLUS_CASES], +) +def test_set_value_at_path_matches_jsonpath_plus(value, json_path, expected): + assert set_value_at_path(value, json_path, "__MEDIA__") == expected + + +@pytest.mark.parametrize( + ("value", "json_path"), + [ + # All-digit keys and keys containing "']" are indistinguishable / broken in + # jsonpath-plus' output, so they cannot be resolved and must raise (the + # caller leaves the value unchanged rather than guessing). + ({"0": _REF}, "$[0]"), + ({"a']b": _REF}, "$['a']b']"), + # Malformed paths the API should never emit. + ("x", "image"), + ({"a": _REF}, "$['a'"), + ({"a": _REF}, "$[a]"), + ], +) +def test_set_value_at_path_raises_on_unresolvable(value, json_path): + with pytest.raises(Exception): + set_value_at_path(value, json_path, "__MEDIA__") + + +def test_parse_path_segments(): + assert parse_path("$") == [] + assert parse_path("$['image']") == ["image"] + assert parse_path("$[0]") == [0] + assert parse_path("$['a']['b'][2]") == ["a", "b", 2] + assert parse_path("$[0][1][2]") == [0, 1, 2] + assert parse_path("$['O'connor']") == ["O'connor"] diff --git a/tests/unit/test_media.py b/tests/unit/test_media.py index 63df03920..343939722 100644 --- a/tests/unit/test_media.py +++ b/tests/unit/test_media.py @@ -4,7 +4,8 @@ import pytest -from langfuse.media import LangfuseMedia +from langfuse._client.resource_manager import LangfuseResourceManager +from langfuse.media import LangfuseMedia, LangfuseMediaReference # Test data SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00" @@ -90,6 +91,29 @@ def test_parse_invalid_reference_string(): ) # Missing fields +@pytest.mark.parametrize( + ("url_expiry", "expected"), + [ + (None, False), + ("not-a-date", False), + # Fixed past/future timestamps so the test ids stay stable across xdist + # workers (a computed datetime.now() would differ per worker collection). + ("2000-01-01T00:00:00+00:00", True), + ("2999-01-01T00:00:00+00:00", False), + ("2000-01-01T00:00:00Z", True), # "Z" suffix, in the past + ], +) +def test_media_reference_is_url_expired(url_expiry, expected): + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + url_expiry=url_expiry, + ) + + assert reference.is_url_expired() is expected + + def test_file_handling(): file_path = "static/puton.jpg" @@ -107,6 +131,105 @@ def test_nonexistent_file(): assert media._content_type is None +def test_media_reference_fetch_uses_configured_httpx_client(monkeypatch): + response = Mock() + response.content = b"test-bytes" + response.raise_for_status.return_value = None + configured_httpx_client = Mock() + configured_httpx_client.get.return_value = response + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=configured_httpx_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert reference.fetch_bytes(timeout=12.5) == b"test-bytes" + configured_httpx_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=12.5 + ) + httpx_get.assert_not_called() + + +def test_media_reference_fetch_uses_explicit_client(monkeypatch): + response = Mock() + response.content = b"explicit-bytes" + response.raise_for_status.return_value = None + explicit_client = Mock() + explicit_client.get.return_value = response + + singleton_client = Mock() + httpx_get = Mock() + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + {"pk-test": SimpleNamespace(httpx_client=singleton_client)}, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + assert ( + reference.fetch_bytes(timeout=5.0, client=explicit_client) == b"explicit-bytes" + ) + explicit_client.get.assert_called_once_with( + "https://example.com/test.jpg", timeout=5.0 + ) + # Explicit client wins over the configured singleton and the default httpx. + singleton_client.get.assert_not_called() + httpx_get.assert_not_called() + + +def test_media_reference_fetch_falls_back_to_default_with_multiple_clients( + monkeypatch, caplog +): + import logging + + response = Mock() + response.content = b"default-bytes" + response.raise_for_status.return_value = None + httpx_get = Mock(return_value=response) + monkeypatch.setattr("langfuse.media.httpx.get", httpx_get) + + client_a = Mock() + client_b = Mock() + monkeypatch.setattr( + LangfuseResourceManager, + "_instances", + { + "pk-a": SimpleNamespace(httpx_client=client_a), + "pk-b": SimpleNamespace(httpx_client=client_b), + }, + ) + + reference = LangfuseMediaReference( + media_id="media-id", + content_type="image/jpeg", + url="https://example.com/test.jpg", + ) + + with caplog.at_level(logging.WARNING, logger="langfuse"): + assert reference.fetch_bytes(timeout=8.0) == b"default-bytes" + + # Ambiguous multi-client setup: warn and fall back to the default httpx + # instead of silently using an arbitrary instance's transport config. + assert "Multiple Langfuse clients" in caplog.text + httpx_get.assert_called_once_with("https://example.com/test.jpg", timeout=8.0) + client_a.get.assert_not_called() + client_b.get.assert_not_called() + + def test_resolve_media_references_uses_configured_httpx_client(): reference_string = "@@@langfuseMedia:type=image/jpeg|id=test-id|source=bytes@@@" fetch_timeout_seconds = 7 diff --git a/tests/unit/test_media_manager.py b/tests/unit/test_media_manager.py index 5e6a8c00a..3ab4e3226 100644 --- a/tests/unit/test_media_manager.py +++ b/tests/unit/test_media_manager.py @@ -23,6 +23,8 @@ def _upload_job() -> dict: "content_sha256_hash": "sha256hash", "trace_id": "trace-id", "observation_id": None, + "dataset_id": None, + "dataset_item_id": None, "field": "input", } @@ -142,6 +144,18 @@ def test_find_and_process_media_valid_base64_uri_is_processed(): assert not queue.empty() +def test_upload_media_sync_rejects_invalid_media(): + manager = MediaManager( + api_client=SimpleNamespace(media=Mock()), + httpx_client=Mock(), + media_upload_queue=Queue(), + ) + media = LangfuseMedia() + + with pytest.raises(ValueError, match="Cannot upload invalid LangfuseMedia"): + manager._upload_media_sync(media=media) + + def test_find_and_process_media_data_uri_without_comma_passes_through(): queue = Queue() manager = MediaManager( diff --git a/tests/unit/test_serializer.py b/tests/unit/test_serializer.py index f4c8dde86..6605485de 100644 --- a/tests/unit/test_serializer.py +++ b/tests/unit/test_serializer.py @@ -12,6 +12,7 @@ from langfuse._utils.serializer import ( EventSerializer, ) +from langfuse.media import LangfuseMediaReference class TestEnum(Enum): @@ -70,6 +71,30 @@ def test_pydantic_model(): assert json.loads(serializer.encode(model)) == {"field": "test"} +def test_langfuse_media_reference_serializes_to_reference_string(): + # Resolved references must round-trip back to their original reference string + # rather than falling through to asdict() and emitting an opaque dict. + reference_string = "@@@langfuseMedia:type=image/png|id=media-id|source=bytes@@@" + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + reference_string=reference_string, + ) + serializer = EventSerializer() + assert serializer.encode(ref) == f'"{reference_string}"' + + +def test_langfuse_media_reference_without_reference_string_falls_back_to_dict(): + ref = LangfuseMediaReference( + media_id="media-id", + content_type="image/png", + url="https://example.com/image.png", + ) + serializer = EventSerializer() + assert json.loads(serializer.encode(ref))["media_id"] == "media-id" + + def test_path(): path = Path("/tmp/test.txt") serializer = EventSerializer() diff --git a/uv.lock b/uv.lock index 903db86eb..1402c5ffd 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ revision = 3 requires-python = ">=3.10, <4.0" [options] -exclude-newer = "2026-06-12T09:30:40.624403737Z" +exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. exclude-newer-span = "P7D" [[package]]