Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion development/notes/pytest_integration_proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def frozen_lake_rollout_processor(row: EvaluationRow, model: str, input_params:
"""
env_url = env_urls[0] if env_urls else None
# ep.rollout handles the core interaction loop with the game environment.
trajectories = ep.rollout(row, model, input_params, env_url)
trajectories = await ep.rollout(row, model, input_params, env_url)
return [t.to_evaluation_row() for t in trajectories]

@evaluation_test(
Expand Down
11 changes: 9 additions & 2 deletions eval_protocol/benchmarks/test_tau_bench_airline.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow:
id=tool_call.id,
name=tool_call.function.name,
arguments=arguments,
requestor="assistant",
)
tau2_tool_calls.append(tau2_tool_call)

Expand All @@ -181,22 +182,28 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow:
trajectory_objects.append(UserMessage(role=role, content=text_content))
elif role == "tool":
tool_id = msg.tool_call_id
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content))
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content, requestor="assistant"))

reward = 1.0

evaluation_criteria = EvaluationCriteria(
nl_assertions=nl_assertions,
communicate_info=communicate_info,
actions=actions,
env_assertions=None,
reward_basis=[ # Use this to adjust how to calculate reward. Tau2-bench uses DB and COMMUNICATE by default for airline tasks.
RewardType.DB,
RewardType.COMMUNICATE,
],
)

task = Task(
id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler")
id="Filler",
description=None,
user_scenario=UserScenario(instructions="Filler", persona=None),
ticket=None,
initial_state=None,
evaluation_criteria=evaluation_criteria,
) # id and user_scenario are required for the Task type but not used in calculating reward
assert task.evaluation_criteria is not None, "Task evaluation criteria is None"

Expand Down
11 changes: 9 additions & 2 deletions eval_protocol/benchmarks/test_tau_bench_retail.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow:
id=tool_call.id,
name=tool_call.function.name,
arguments=arguments,
requestor="assistant",
)
tau2_tool_calls.append(tau2_tool_call)

Expand All @@ -171,22 +172,28 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow:
trajectory_objects.append(UserMessage(role=role, content=text_content))
elif role == "tool":
tool_id = msg.tool_call_id
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content))
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content, requestor="assistant"))

reward = 1.0

evaluation_criteria = EvaluationCriteria(
nl_assertions=nl_assertions,
communicate_info=communicate_info,
actions=actions,
env_assertions=None,
reward_basis=[ # Use this to adjust how to calculate reward. Tau2-bench uses DB and COMMUNICATE by default for retail tasks.
RewardType.DB,
RewardType.COMMUNICATE,
],
)

task = Task(
id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler")
id="Filler",
description=None,
user_scenario=UserScenario(instructions="Filler", persona=None),
ticket=None,
initial_state=None,
evaluation_criteria=evaluation_criteria,
) # id and user_scenario are required for the Task type but not used in calculating reward
assert task.evaluation_criteria is not None, "Task evaluation criteria is None"

Expand Down
9 changes: 7 additions & 2 deletions eval_protocol/execution/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ async def _execute_mcp_agent_rollout(
f"Sample {sample_id}: Agent Rollout Turn {turn_num + 1}/{max_rollout_turns}. History size: {len(current_messages_for_rollout)}"
)

# model_client is initialized when generation is enabled; assert for type-checker
assert self.model_client is not None
generation_output_turn = await self.model_client.generate(
messages=current_messages_for_rollout,
session=http_session,
Expand Down Expand Up @@ -845,7 +847,9 @@ async def process_with_semaphore_wrapper(sample_idx: int, sample_data: Dict[str,

for i_outer in range(0, len(tasks), batch_size_for_logging):
batch_tasks = tasks[i_outer : i_outer + batch_size_for_logging]
batch_results_values = await asyncio.gather(*batch_tasks, return_exceptions=True)
batch_results_values: List[
Union[Exception, Dict[str, Any], List[Dict[str, Any]]]
] = await asyncio.gather(*batch_tasks, return_exceptions=True)
for res_idx, res_or_exc in enumerate(batch_results_values):
if isinstance(res_or_exc, Exception):
logger.error(
Expand All @@ -863,7 +867,8 @@ async def process_with_semaphore_wrapper(sample_idx: int, sample_data: Dict[str,
if isinstance(res_or_exc, list):
all_results.extend(res_or_exc)
else:
all_results.append(res_or_exc)
# res_or_exc is a Dict[str, Any] here
all_results.append(res_or_exc) # type: ignore[arg-type]
logger.info(
f"Completed batch up to sample {i_outer + len(batch_tasks)}. Total results/errors: {len(all_results)}"
)
Expand Down
16 changes: 15 additions & 1 deletion eval_protocol/mcp/simulation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ async def list_resources():
description = resource_func.__doc__ or f"Resource {resource_name}"

# Some callables may not have the attribute; guard for type checkers
# MyPy/Pyright: Resource expects AnyUrl; convert string to str, letting pydantic coerce it
uri_value = getattr(resource_func, "_resource_uri", f"/{resource_name}")
resources.append(
Resource(
Expand All @@ -346,7 +347,7 @@ def _register_session_handlers(self):
"""Register session initialization and cleanup handlers."""

@self.app.set_logging_level()
async def set_logging_level(level):
async def set_logging_level(level: str):
"""Handle logging level requests."""
logger.setLevel(getattr(logging, level.upper()))
return {}
Expand Down Expand Up @@ -392,6 +393,19 @@ def get_default_config(self) -> Dict[str, Any]:
"""Get default environment configuration."""
pass

# Optional hook: some environments need seed at creation time
def create_environment_with_seed(
self, config: Dict[str, Any], *, seed: Optional[int] = None
) -> Tuple[Any, Any, Dict[str, Any]]:
"""Create environment with a seed when required; default falls back to create+reset.

Subclasses can override when the environment requires the seed at construction time.
Returns a tuple of (env, initial_observation, info).
"""
env = self.create_environment(config)
obs, info = self.reset_environment(env, seed=seed)
return env, obs, info

def run(self, port: int = 8000, host: str = "127.0.0.1", **kwargs):
"""
Run the simulation server using StreamableHTTPSessionManager.
Expand Down
17 changes: 10 additions & 7 deletions eval_protocol/mcp_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def make(
return mcp_envs


def rollout(
async def rollout(
envs: GeneralMCPVectorEnv,
policy: Union[FireworksPolicy, LLMBasePolicy, Callable],
*,
Expand All @@ -250,7 +250,7 @@ def rollout(
steps: int = 512,
openai_format_log_file: Optional[str] = None,
max_concurrent_rollouts: int = 8,
) -> List[asyncio.Task[EvaluationRow]]:
) -> List[EvaluationRow]:
"""
Execute general rollouts using tool calling interface with automatic record/playback.

Expand Down Expand Up @@ -282,10 +282,10 @@ def rollout(

Example:
# Live mode
tasks = ep.rollout(envs, policy)
results = await ep.rollout(envs, policy)

# Create environments automatically
tasks = ep.rollout(
results = await ep.rollout(
"http://localhost:8000/mcp/",
policy,
evaluation_rows=my_evaluation_rows,
Expand All @@ -294,10 +294,10 @@ def rollout(

# Recording mode
os.environ["EP_PLAYBACK_FILE"] = "record.jsonl"
tasks = ep.rollout(envs, policy, openai_format_log_file="sft_data.jsonl")
results = await ep.rollout(envs, policy, openai_format_log_file="sft_data.jsonl")

# Playback mode (after recording file exists)
tasks = ep.rollout(envs, policy)
results = await ep.rollout(envs, policy)
"""
# Automatically create environments if a base URL is provided
if isinstance(envs, str):
Expand All @@ -313,7 +313,10 @@ def rollout(
tasks = execution_manager.execute_rollouts(
envs, policy, steps, openai_format_log_file, max_concurrent_rollouts, evaluation_rows
)
return tasks

# Await all tasks and return concrete EvaluationRows
results: List[EvaluationRow] = await asyncio.gather(*tasks)
return results


async def test_mcp(base_url: str, seeds: List[int]) -> Dict[str, Any]:
Expand Down
10 changes: 8 additions & 2 deletions eval_protocol/mcp_servers/tau2/tests/test_tau2_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ def tau2_airline_eval(
id=tool_call.id,
name=tool_call.function.name,
arguments=arguments,
requestor="assistant",
)
tau2_tool_calls.append(tau2_tool_call)

Expand All @@ -747,7 +748,7 @@ def tau2_airline_eval(
trajectory_objects.append(UserMessage(role=role, content=content))
elif role == "tool":
tool_id = msg.tool_call_id
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=content))
trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=content, requestor="assistant"))

reward = 1.0

Expand All @@ -764,7 +765,12 @@ def tau2_airline_eval(
)

task = Task(
id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler")
id="Filler",
description=None,
user_scenario=UserScenario(instructions="Filler", persona=None),
ticket=None,
initial_state=None,
evaluation_criteria=evaluation_criteria,
) # id and user_scenario are required for the Task type but not used in calculating reward, filler values

env_reward_info = EnvironmentEvaluator.calculate_reward(
Expand Down
7 changes: 5 additions & 2 deletions eval_protocol/pytest/default_langchain_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ async def _invoke_direct(payload):

invoke_fn = _invoke_direct
elif callable(target):

# If target is a normal callable, call it directly; if it returns an awaitable, await it
async def _invoke_wrapper(payload):
return await target(payload)
result = target(payload)
if asyncio.iscoroutine(result):
return await result
return result

invoke_fn = _invoke_wrapper
else:
Expand Down
21 changes: 14 additions & 7 deletions eval_protocol/pytest/default_mcp_gym_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,20 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->
)

# Get rollout tasks from ep.rollout
tasks = ep.rollout(
envs,
policy=self.policy,
evaluation_rows=rows,
steps=config.steps,
max_concurrent_rollouts=config.max_concurrent_rollouts,
)
async def _run_rollout_and_wrap(row_index: int) -> EvaluationRow:
# ep.rollout now returns concrete results
results = await ep.rollout(
envs,
policy=self.policy,
evaluation_rows=rows,
steps=config.steps,
max_concurrent_rollouts=config.max_concurrent_rollouts,
)
return results[row_index]

tasks: List[asyncio.Task[EvaluationRow]] = [
asyncio.create_task(_run_rollout_and_wrap(i)) for i in range(len(rows))
]
return tasks

def cleanup(self) -> None:
Expand Down
7 changes: 3 additions & 4 deletions examples/taxi_mcp_complete/local_testing/test_north_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ async def test_north_star_interface():
start_time = time.time()
evaluation_rows = await ep.rollout(
envs,
policy=policy,
steps=25, # Taxi typically needs more steps than FrozenLake
openai_format_log_file=("clean_openai_format.jsonl" if recording_mode else None),
)
policy,
steps=20,
) # Keep short for testing
duration = time.time() - start_time
print(f"✅ Completed {len(evaluation_rows)} evaluation rows in {duration:.2f}s")

Expand Down
7 changes: 6 additions & 1 deletion tests/pytest/test_tau_bench_airline.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow:
)

task = Task(
id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler")
id="Filler",
description=None,
user_scenario=UserScenario(instructions="Filler", persona=None),
ticket=None,
initial_state=None,
evaluation_criteria=evaluation_criteria,
) # id and user_scenario are required for the Task type but not used in calculating reward

if RewardType.DB in task.evaluation_criteria.reward_basis:
Expand Down
6 changes: 2 additions & 4 deletions tests/test_rollout_control_plane_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,16 +522,14 @@ def mock_execute_rollouts(*args, **kwargs):
manager_instance.execute_rollouts = mock_execute_rollouts

result = []
tasks = ep.rollout(
tasks = await ep.rollout(
"http://localhost:1234/mcp/",
policy,
dataset=dataset,
model_id="test_model",
steps=5,
)
for task in tasks:
row = await task
result.append(row)
result.extend(tasks)

mock_make.assert_called_once_with(
"http://localhost:1234/mcp/",
Expand Down