Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eval_protocol/adapters/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def __init__(self):
if not LANGFUSE_AVAILABLE:
raise ImportError("Langfuse not installed. Install with: pip install 'eval-protocol[langfuse]'")

self.client = get_client()
self.client = get_client() # pyright: ignore[reportCallIssue]

def get_evaluation_rows(
self,
Expand Down
4 changes: 2 additions & 2 deletions eval_protocol/adapters/langsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class LangSmithAdapter(BaseAdapter):
- outputs: { messages: [...] } | { content } | { result } | { answer } | { output } | str | list[dict]
"""

def __init__(self, client: Optional[Client] = None) -> None:
def __init__(self, client: Optional[Any] = None) -> None:
if not LANGSMITH_AVAILABLE:
raise ImportError("LangSmith not installed. Install with: pip install 'eval-protocol[langsmith]'")
self.client = client or Client()
self.client = client or Client() # pyright: ignore[reportCallIssue]

def get_evaluation_rows(
self,
Expand Down
10 changes: 10 additions & 0 deletions eval_protocol/pytest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from .exception_config import ExceptionHandlerConfig, BackoffConfig, get_default_exception_handler_config
from .rollout_processor import RolloutProcessor
from .types import RolloutProcessorConfig
from .data_loaders import (
EvaluationDataLoader,
InlineDataLoader,
LangfuseAdapterLoader,
LangfuseLoaderConfig,
)

# Conditional import for optional dependencies
try:
Expand Down Expand Up @@ -38,6 +44,10 @@
"ExceptionHandlerConfig",
"BackoffConfig",
"get_default_exception_handler_config",
"EvaluationDataLoader",
"InlineDataLoader",
"LangfuseAdapterLoader",
"LangfuseLoaderConfig",
]

# Only add to __all__ if available
Expand Down
173 changes: 173 additions & 0 deletions eval_protocol/pytest/data_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Data loader abstractions for evaluation tests."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Callable, Protocol, Sequence

from eval_protocol.adapters.base import BaseAdapter
from eval_protocol.models import EvaluationRow, Message
from eval_protocol.pytest.types import EvaluationTestMode, InputMessagesParam
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger


@dataclass(slots=True)
class DataLoaderContext:
"""Context provided to loader variants when materializing data."""

max_rows: int | None
preprocess_fn: Callable[[list[EvaluationRow]], list[EvaluationRow]] | None
logger: DatasetLogger
invocation_id: str
experiment_id: str
mode: EvaluationTestMode


@dataclass(slots=True)
class DataLoaderResult:
"""Rows and metadata returned by a loader variant."""

rows: list[EvaluationRow]
source_id: str
source_metadata: dict[str, Any] = field(default_factory=dict)
raw_payload: Any | None = None
preprocessed: bool = False


@dataclass(slots=True)
class DataLoaderVariant:
"""Single parameterizable variant from a data loader."""

id: str
description: str
loader: Callable[[DataLoaderContext], DataLoaderResult]
metadata: dict[str, Any] = field(default_factory=dict)

def load(self, ctx: DataLoaderContext) -> DataLoaderResult:
"""Load a dataset for this variant using the provided context."""

return self.loader(ctx)


class EvaluationDataLoader(Protocol):
"""Protocol for data loaders that can be consumed by ``evaluation_test``."""

def variants(self) -> Sequence[DataLoaderVariant]:
"""Return parameterizable variants emitted by this loader."""

...


@dataclass(slots=True)
class InlineDataLoader(EvaluationDataLoader):
"""Data loader for inline ``EvaluationRow`` or message payloads."""

rows: Sequence[EvaluationRow] | None = None
messages: Sequence[InputMessagesParam] | None = None
variant_id: str = "inline"
description: str | None = None

def __post_init__(self) -> None:
if self.rows is None and self.messages is None:
raise ValueError("InlineDataLoader requires rows or messages to be provided")

def variants(self) -> Sequence[DataLoaderVariant]:
def _load(ctx: DataLoaderContext) -> DataLoaderResult:
resolved_rows: list[EvaluationRow] = []
if self.rows is not None:
resolved_rows.extend(row.model_copy(deep=True) for row in self.rows)
if self.messages is not None:
for dataset_messages in self.messages:
row_messages: list[Message] = []
for msg in dataset_messages:
if isinstance(msg, Message):
row_messages.append(msg.model_copy(deep=True))
else:
row_messages.append(Message.model_validate(msg))
resolved_rows.append(EvaluationRow(messages=row_messages))

if ctx.max_rows is not None:
resolved_rows = resolved_rows[: ctx.max_rows]

metadata = {
"data_loader_variant_id": self.variant_id,
"data_loader_type": "inline",
"row_count": len(resolved_rows),
}

return DataLoaderResult(
rows=resolved_rows,
source_id=self.variant_id,
source_metadata=metadata,
)

description = self.description or self.variant_id
return [
DataLoaderVariant(
id=self.variant_id,
description=description,
loader=_load,
metadata={"type": "inline"},
)
]


@dataclass(slots=True)
class LangfuseLoaderConfig:
"""Configuration for a single Langfuse adapter variant."""

id: str
kwargs: dict[str, Any] = field(default_factory=dict)
description: str | None = None


@dataclass(slots=True)
class LangfuseAdapterLoader(EvaluationDataLoader):
"""Wrap a ``LangfuseAdapter`` (or compatible adapter) as a data loader."""

adapter: BaseAdapter
variants_config: Sequence[LangfuseLoaderConfig]

def variants(self) -> Sequence[DataLoaderVariant]:
loader_variants: list[DataLoaderVariant] = []

for config in self.variants_config:

def _load(ctx: DataLoaderContext, *, _config: LangfuseLoaderConfig = config) -> DataLoaderResult:
rows = self.adapter.get_evaluation_rows(**_config.kwargs)
if ctx.max_rows is not None:
rows = rows[: ctx.max_rows]

metadata = {
"data_loader_variant_id": _config.id,
"data_loader_type": "langfuse",
"adapter_kwargs": _config.kwargs,
}

return DataLoaderResult(
rows=[row.model_copy(deep=True) for row in rows],
source_id=_config.id,
source_metadata=metadata,
)

loader_variants.append(
DataLoaderVariant(
id=config.id,
description=config.description or config.id,
loader=_load,
metadata={"type": "langfuse", "adapter_kwargs": config.kwargs},
)
)

return loader_variants


__all__ = [
"DataLoaderContext",
"DataLoaderResult",
"DataLoaderVariant",
"EvaluationDataLoader",
"InlineDataLoader",
"LangfuseAdapterLoader",
"LangfuseLoaderConfig",
]
Loading
Loading