Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions eval_protocol/pytest/default_mcp_gym_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
import signal
import socket
import subprocess
import threading
import time
from pathlib import Path
from typing import List, Optional

import eval_protocol as ep
from eval_protocol.mcp.execution.manager import ExecutionManager
from eval_protocol.models import EvaluationRow
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig
from eval_protocol.mcp.execution.manager import ExecutionManager


class MCPServerManager:
Expand Down Expand Up @@ -181,8 +182,9 @@ def _signal_handler(cls, signum, frame):
def _register_cleanup_handlers(cls):
"""Register cleanup handlers - called only once"""
atexit.register(cls._cleanup_all_servers)
signal.signal(signal.SIGINT, cls._signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, cls._signal_handler) # Termination signal
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGINT, cls._signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, cls._signal_handler) # Termination signal

def __enter__(self):
"""Context manager entry"""
Expand Down Expand Up @@ -223,28 +225,6 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->
try:
self.server.start()

model_id = str(
(config.completion_params.get("model") if config.completion_params else None) or "gpt-4o-mini"
)
temperature = config.completion_params.get("temperature", 0.0)
max_tokens = config.completion_params.get("max_tokens", 4096)

# Pass all other completion_params (e.g. stream=True) via kwargs
other_params = {
k: v
for k, v in (config.completion_params or {}).items()
if k not in ["model", "temperature", "max_tokens", "extra_body"]
}
extra_body = config.completion_params.get("extra_body", {}) or {}

self.policy = ep.LiteLLMPolicy(
model_id=model_id,
temperature=temperature,
max_tokens=max_tokens,
**extra_body,
**other_params,
)

except Exception as e:
if self.server:
self.server.stop()
Expand All @@ -254,13 +234,31 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->

else:
# Reuse existing MCP environments for retry
if not self.server or not self.policy:
if not self.server:
raise RuntimeError(
"Cannot retry without existing server/environments. Call with start_server=True first."
)

model_id = str((config.completion_params.get("model") if config.completion_params else None) or "gpt-4o-mini")
temperature = config.completion_params.get("temperature", 0.0)
max_tokens = config.completion_params.get("max_tokens", 4096)

# Pass all other completion_params (e.g. stream=True) via kwargs
other_params = {
k: v
for k, v in (config.completion_params or {}).items()
if k not in ["model", "temperature", "max_tokens", "extra_body"]
}
extra_body = config.completion_params.get("extra_body", {}) or {}

self.policy = ep.LiteLLMPolicy(
model_id=model_id,
temperature=temperature,
max_tokens=max_tokens,
**extra_body,
**other_params,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Policy Reinitialization Breaks Retry Semantics

The LiteLLMPolicy is unexpectedly re-initialized on every call to __call__, even during retries (start_server=False). This breaks the intended retry semantics of reusing the policy, potentially causing inconsistent behavior or state issues. The related policy existence check for retries was also removed.

Fix in Cursor Fix in Web

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Server Cleanup Fails on Policy Creation Error

Policy initialization code was moved outside the exception handling block. When start_server=False (retry scenario), if policy creation fails (lines 242-260), there is no exception handler to clean up the server that was started in a previous call. This can leave the MCP server running indefinitely without cleanup, causing resource leaks. The exception handler at lines 228-233 only catches exceptions during server.start() when start_server=True, not during policy creation when start_server=False.

Fix in Cursor Fix in Web

# Create MCP environments directly from evaluation_rows
assert self.policy is not None, "Policy must be initialized before rollout"
envs = ep.make(
"http://localhost:9700/mcp/",
evaluation_rows=rows,
Expand Down
Loading