| title | Specification |
|---|---|
| icon | book |
The following concepts define the lifecycle and data units of an evaluation. These match the semantics used by the @evaluation_test decorator in the Python SDK.
A single execution of a test function. One invocation can generate one or more experiments.
A group of runs for a specific combination of parameters (e.g., model x dataset x generation params). Each new execution of the test function produces a new experiment.
A group of rollouts produced when repeating the same experiment multiple times. When num_runs > 1, each repetition has a unique run_id.
The process that produces a trajectory for a single row. Each rollout has a unique rollout_id.
The sequence of chat messages (and optional tool calls) produced during a rollout.
The atomic evaluation unit. A row contains the conversation messages, optional
ground_truth, and the evaluator's evaluation_result. Every row is uniquely
identified by its row_id. If not provided by the dataset, a stable hash is
generated based on the row's content.
A collection (list) of rows. When stored, it is a JSONL file where each line is an EvaluationRow.
The rubric implemented in the body of an @evaluation_test-decorated function. It computes a score in
[0, 1] and writes it to the row’s evaluation_result.
JSONType = Union[Dict[str, Any], List[Any], str, int, float, bool, None]Represents a chat message with trajectory evaluation support. content supports either a string or OpenAI content parts.
class ChatCompletionContentPartTextParam(BaseModel):
text: str
type: Literal["text"] = "text"
class Message(BaseModel):
role: str # assistant, user, system, tool
content: Optional[Union[str, List[ChatCompletionContentPartTextParam]]] = ""
reasoning_content: Optional[str] = None
name: Optional[str] = None
tool_call_id: Optional[str] = None
tool_calls: Optional[List[ChatCompletionMessageToolCall]] = None
function_call: Optional[FunctionCall] = None
control_plane_step: Optional[Dict[str, Any]] = NoneCompletionParams = Dict[str, Any]
"""
Provider-agnostic completion parameters.
Required:
- model: str
Common fields:
- temperature: Optional[float]
- max_tokens: Optional[int]
- top_p: Optional[float]
Extra provider-specific fields are allowed and passed through (e.g., max_tool_calls).
"""class InputMetadata(BaseModel):
# Accepts additional keys for future extensibility
# (model_config = ConfigDict(extra="allow") in implementation)
row_id: Optional[str] # defaulted to a generated ID
completion_params: CompletionParams = Field(default_factory=dict)
dataset_info: Optional[Dict[str, Any]] # seed, system_prompt, environment_context, etc.
session_data: Optional[Dict[str, Any]]Structured error detail used inside Status.details per Google's AIP-193.
class ErrorInfo(BaseModel):
reason: str
domain: str
metadata: Dict[str, Any] = {}class Status(BaseModel):
class Code(int, Enum):
OK = 0
CANCELLED = 1
UNKNOWN = 2
INVALID_ARGUMENT = 3
DEADLINE_EXCEEDED = 4
NOT_FOUND = 5
ALREADY_EXISTS = 6
PERMISSION_DENIED = 7
RESOURCE_EXHAUSTED = 8
FAILED_PRECONDITION = 9
ABORTED = 10
OUT_OF_RANGE = 11
UNIMPLEMENTED = 12
INTERNAL = 13
UNAVAILABLE = 14
DATA_LOSS = 15
UNAUTHENTICATED = 16
# Custom codes used by Eval Protocol
FINISHED = 100
RUNNING = 101
SCORE_INVALID = 102
code: Code
message: str
details: List[Dict[str, Any]] = []class TerminationReason(str, Enum):
MAX_STEPS = "max_steps"
CONTROL_PLANE_SIGNAL = "control_plane_signal"
USER_STOP = "user_stop"
SKIPPABLE_ERROR = "skippable_error"
NON_SKIPPABLE_ERROR = "non_skippable_error"
STOP = "stop"
LENGTH = "length"
TOOL_CALLS = "tool_calls"Result of a single metric evaluation:
class MetricResult(BaseModel):
is_score_valid: bool = True
score: float # Between 0.0 and 1.0
reason: str # Explanation for the score
data: Dict[str, Any] = Field(default_factory=dict) # Optional extra metric dataDefines the base reward and other metrics for a single conceptual step within a rollout:
class StepOutput(BaseModel):
step_index: Union[int, str] # User-defined index for the step
base_reward: float # Base reward calculated by the user's reward function
terminated: bool = False # Whether the environment signaled termination
control_plane_info: Optional[Dict[str, Any]] # Structured info from environment
metrics: Dict[str, Any] = Field(default_factory=dict) # Optional custom metrics
reason: Optional[str] # Optional explanation for the step's base rewardclass EvaluationThreshold(BaseModel):
success: float # Minimum success rate threshold (0.0 to 1.0)
standard_error: Optional[float] # Optional maximum standard error thresholdclass EvalMetadata(BaseModel):
name: str
description: Optional[str]
version: str # PEP 440 version string (auto-populated)
status: Optional[Status]
num_runs: int
aggregation_method: str
passed_threshold: Optional[EvaluationThreshold]
passed: Optional[bool]class CostMetrics(BaseModel):
input_cost: Optional[float]
output_cost: Optional[float]
total_cost_dollar: Optional[float]class ExecutionMetadata(BaseModel):
invocation_id: Optional[str]
experiment_id: Optional[str]
rollout_id: Optional[str]
run_id: Optional[str]
usage: Optional[CompletionUsage]
cost_metrics: Optional[CostMetrics]
duration_seconds: Optional[float]
experiment_duration_seconds: Optional[float]The EvaluateResult represents the complete result of an evaluator, providing an overall score and component metrics.
class EvaluateResult(BaseModel):
# Core evaluation data
score: float # Overall evaluation score (0.0 to 1.0)
is_score_valid: bool # Whether the overall score is valid (defaults to True)
reason: Optional[str] # Optional explanation for the overall score
# Component metrics
metrics: Dict[str, MetricResult] # Dictionary of component metrics
# RL-specific fields
step_outputs: Optional[List[StepOutput]] # Per-step base rewards for RL
# Error handling
error: Optional[str] # Optional error message if evaluation failed
# Trajectory information
trajectory_info: Optional[Dict[str, Any]] # Additional trajectory-level information
final_control_plane_info: Optional[Dict[str, Any]] # Final control plane state
# Aggregation across runs
agg_score: Optional[float] # Aggregated score across runs
standard_error: Optional[float] # Standard error across runsKey Features:
- Unified Model: Serves both per-turn and per-trajectory evaluation scenarios
- Component Metrics: Detailed breakdown through
MetricResultobjects - RL Support: Per-step base rewards via
step_outputsfor reinforcement learning - Error Handling: Graceful error reporting and validation
- Trajectory Info: Additional metadata for trajectory-based evaluations
- Aggregation: Optional
agg_scoreandstandard_errorfor multi-run summaries
The EvaluationRow is the canonical JSON-serializable unit of data used for both single-turn and trajectory evaluations. It contains the conversation, tool context, evaluation results, and metadata needed for reproducibility and analysis.
class EvaluationRow(BaseModel):
# Core conversation (trajectory) data
messages: List[Message]
# Tool and function call information
tools: Optional[List[Dict[str, Any]]] = None
# Input-related metadata
input_metadata: InputMetadata = Field(default_factory=InputMetadata)
# Rollout status (AIP-193)
rollout_status: Status = Field(default_factory=Status.rollout_running)
# Optional ground truth reference
ground_truth: Optional[JSONType] = None
# Unified evaluation result
evaluation_result: Optional[EvaluateResult] = None
# Correlation identifiers grouped under execution metadata
execution_metadata: ExecutionMetadata = Field(default_factory=lambda: ExecutionMetadata(run_id=None))
# Timestamps and evaluation metadata
created_at: datetime = Field(default_factory=datetime.now)
eval_metadata: Optional[EvalMetadata] = None
# Process info for watchdogs
pid: Optional[int] = NoneKey Features:
- Unified Format: Canonical row format for both pointwise and trajectory evaluations
- Explicit Status:
rollout_statuscaptures running/finished/error - Reproducibility:
input_metadata, seeds, and identifiers support traceability - Usage Tracking: Captures token usage statistics from LLM calls
A list of EvaluationRows. When saved to file, it is a JSONL file where each
line is a JSON-encoded EvaluationRow.
{
"messages": [
{ "role": "system", "content": "You are a helpful assistant." },
{ "role": "user", "content": "Add 2 and 3." },
{ "role": "assistant", "content": "5" }
],
"tools": null,
"input_metadata": {
"row_id": "row_123",
"completion_params": {
"model": "openai/gpt-4o",
"temperature": 0.0,
"max_tokens": 256,
"max_tool_calls": 0
},
"dataset_info": {
"seed": 42,
"system_prompt": "You are a helpful assistant.",
"environment_context": {}
},
"session_data": {
"mode": "pointwise"
}
},
"rollout_status": {
"code": 100,
"message": "Rollout finished",
"details": []
},
"ground_truth": "5",
"evaluation_result": {
"score": 1.0,
"is_score_valid": true,
"reason": "Exact match",
"metrics": {
"exact_match": {
"is_score_valid": true,
"score": 1.0,
"reason": "assistant output matches ground truth"
}
},
"step_outputs": null,
"error": null,
"trajectory_info": null,
"final_control_plane_info": null,
"agg_score": 1.0,
"standard_error": 0.0
},
"execution_metadata": {
"invocation_id": "ivk_abcd",
"experiment_id": "exp_efgh",
"rollout_id": "rll_ijkl",
"run_id": null,
"usage": {
"prompt_tokens": 10,
"completion_tokens": 1,
"total_tokens": 11
},
"cost_metrics": { "total_cost_dollar": 0.0002 },
"duration_seconds": 0.012,
"experiment_duration_seconds": 0.045
},
"created_at": "2025-01-01T12:00:00",
"eval_metadata": {
"name": "basic_addition",
"description": "Verify simple arithmetic",
"version": "0.1.0",
"status": { "code": 100, "message": "Evaluation finished", "details": [] },
"num_runs": 1,
"aggregation_method": "mean",
"passed_threshold": { "success": 0.95 },
"passed": true
},
"pid": 12345
}The EvaluationTest represents a test configuration for evaluating models.
While not explicitly defined as a separate class in the current implementation,
evaluation tests are configured through the evaluation_test decorator. The decorator
can be used to configure the following:
- Dataset Configuration: JSONL files containing test cases or hard-coded
input_messages - Model Configuration: Completion parameters (must include
model) and generation settings viacompletion_params - Evaluation Criteria: Success thresholds (via
passed_threshold), with optional standard deviation constraint - Environment Configuration: MCP config, rollout steps, server path, and concurrency
- Rollout Processor: Class to execute rollouts (e.g.,
SingleTurnRolloutProcessor()) - Number of Runs: Number of times to repeat the rollout (e.g.,
num_runs=1) - Mode: Evaluation mode (
pointwise,groupwise, orall) - Aggregation: Aggregation method (e.g.,
mean) and optional env overrides for summaries
from eval_protocol.pytest import evaluation_test, SingleTurnRolloutProcessor
@evaluation_test(
input_dataset=["tests/pytest/data/markdown_dataset.jsonl"],
dataset_adapter=markdown_dataset_to_evaluation_row,
completion_params=[{
"model": "fireworks_ai/accounts/fireworks/models/llama-v3p1-8b-instruct",
"temperature": 0.0,
"max_tokens": 4096,
}],
passed_threshold={"success": 0.5},
rollout_processor=SingleTurnRolloutProcessor(),
num_runs=1,
mode="pointwise",
)
def test_markdown_highlighting_evaluation(row: EvaluationRow) -> EvaluationRow:
...McpGym is the base class for building environments that an LLM can interact with via MCP tool calls (data plane) while exposing rewards and episode status via HTTP control-plane endpoints. This enables reproducible RL-style rollouts with clean separation of concerns.
Key concepts:
- Data plane: Tool calls and JSON responses used by the model to act and observe state
- Control plane: Session-scoped endpoints for rewards, termination, and info
- Multi-session: Stable
session_idkeys route control-plane queries to the right episode
Core API surface:
control_plane_endpoint(path): Decorator to register a session-aware endpoint_register_tools(): Register domain tools withself.mcp.tool()format_observation(obs, env) -> Dict[str, Any]: Return JSON-serializable observation payloadsrun(transport="streamable-http"): Start the FastMCP server with high-concurrency settings- Standard control-plane endpoints on subclasses:
/control/reward,/control/status,/control/info,/control/initial_state
Example stub:
class McpGym(ABC):
def __init__(self, server_name: str, adapter: EnvironmentAdapter, seed: Optional[int] = None, max_workers: Optional[int] = None):
...
@abstractmethod
def _register_tools(self):
...
def format_observation(self, obs: Any, env: Any) -> Dict[str, Any]:
...
def run(self, transport: str = "streamable-http", **kwargs):
...See python-sdk/eval_protocol/mcp/mcpgym.py for the full implementation including the control_plane_endpoint decorator and session handling.
The EnvironmentAdapter class provides the interface for connecting environments to the MCP framework.
class EnvironmentAdapter:
"""
Environment adapter with default implementations.
Users can either use this class directly by providing an env_class,
or inherit from it to customize specific methods for their environment.
This provides a clean separation between the MCP protocol layer
and the environment implementation.
"""Key Features:
- Default Implementations: Works with most gymnasium-style and complex environments
- Flexible Configuration: Supports custom configuration dictionaries
- Seed Support: Reproducible environments through seed-based initialization
- Clean Interface: Separates MCP protocol layer from environment implementation
Core Methods:
create_environment(): Create and return a new environment instancecreate_environment_with_seed(): Create environment with specific seed for reproducibilityreset_environment(): Reset environment to initial statestep_environment(): Execute one step in the environmentclose_environment(): Clean up environment resourcesparse_action(): Parse action string to environment-specific formatformat_observation(): Format observation for MCP transmission
A policy is a model such as gpt-4o or llama-3.1-8b. In more advanced scenarios, a policy can be your own custom fine-tuned model.
The LiteLLMPolicy class provides a unified implementation that works with ANY MCP environment via tool calling:
class LiteLLMPolicy(LLMBasePolicy):
"""
Unified LiteLLM policy implementation that works with ANY MCP environment via tool calling.
Supports OpenAI, Anthropic, Fireworks AI
Includes built-in retry logic and caching.
NO environment-specific logic - everything comes from MCP tools and dataset prompts.
"""Key Features:
- Provider Agnostic: Supports OpenAI, Anthropic, Fireworks AI, and other providers
- Built-in Caching: Multiple cache types (memory, Redis, dual, S3, disk)
- Retry Logic: Robust retry strategies with exponential backoff
- Tool Calling: Native support for MCP tool calling
- Environment Agnostic: No environment-specific logic - everything from MCP tools
Specialized Implementations:
OpenAIPolicy: OpenAI-specific policy implementationAnthropicPolicy: Anthropic Claude-specific policy implementationFireworksPolicy: Fireworks AI-specific policy implementationLocalPolicy: Local model policy implementation
Core Capabilities:
- Multi-Tool Support: Handle multiple tool calls per turn
- Conversation History: Maintain context across interactions
- Error Handling: Graceful handling of API failures and retries
- Caching: Response caching for improved performance and cost reduction
- Logging: Comprehensive logging for debugging and analysis
Represents a single MCP session with an environment:
@dataclass
class MCPSession:
session_id: str
base_url: str
seed: Optional[int]
model_id: str
dataset_row: Optional[DatasetRow] = None
terminated: bool = False
last_observation: Any = None
_exit_stack: Optional[AsyncExitStack] = None # persistent connection resources
_mcp_session: Optional[ClientSession] = None # persistent MCP client sessionRepresents a complete rollout trajectory:
@dataclass
class Trajectory:
session: MCPSession
observations: List[Any]
actions: List[str]
rewards: List[float]
terminated: bool
total_reward: float
steps: int
duration: float
control_plane_steps: List[Dict[str, Any]]
control_plane_summary: Dict[str, Any]
termination_reason: str
conversation_history: List[Dict[str, Any]]
usage: Dict[str, int] = field(default_factory=dict)