Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions eval_protocol/pytest/exception_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_default_retryable_exceptions() -> Set[Type[Exception]]:
return _default_retryable_exceptions

# Lazy imports (these are expensive)
import aiohttp
import httpx
import litellm
import requests
Expand All @@ -32,6 +33,9 @@ def get_default_retryable_exceptions() -> Set[Type[Exception]]:
ConnectionError, # type: ignore[assignment]
TimeoutError, # type: ignore[assignment]
OSError, # type: ignore[assignment] # Covers network-related OS errors
# aiohttp library exceptions
aiohttp.ClientConnectionError,
aiohttp.ServerDisconnectedError,
# Requests library exceptions
requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
Expand Down
28 changes: 20 additions & 8 deletions eval_protocol/pytest/remote_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ def __init__(
self._timeout_seconds = timeout_seconds
self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url)
self._session: Optional[aiohttp.ClientSession] = None
self._active_runs = 0

def _get_or_create_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session

def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]:
self._active_runs += 1
Comment thread
xzrderek marked this conversation as resolved.
Outdated
tasks: List[asyncio.Task[EvaluationRow]] = []

# Start with constructor values
Expand Down Expand Up @@ -104,6 +106,9 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
try:
session = self._get_or_create_session()
async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp:
if resp.status >= 500:
body = await resp.text()
raise ConnectionError(f"Remote /init returned server error (HTTP {resp.status}): {body}")
if resp.status >= 400:
body = await resp.text()
raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}")
Expand Down Expand Up @@ -203,20 +208,27 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
tasks = [asyncio.create_task(_sem_wrapper(row)) for row in rows]
return tasks

def _should_close_session(self) -> bool:
self._active_runs = max(0, self._active_runs - 1)
return self._active_runs == 0 and self._session is not None and not self._session.closed
Comment thread
xzrderek marked this conversation as resolved.
Outdated

async def acleanup(self) -> None:
"""Async cleanup - preferred when you can await."""
if self._session and not self._session.closed:
await self._session.close()
"""Async cleanup — only closes the session when the last run finishes.

rollout_processor_with_retry calls acleanup() per-run, but the session
is shared across parallel runs. Closing it early would cancel in-flight
requests in other runs.
"""
if self._should_close_session():
await self._session.close() # type: ignore[union-attr]

def cleanup(self) -> None:
"""Sync cleanup - best-effort, schedules close if event loop is running."""
if self._session and not self._session.closed:
"""Sync cleanup best-effort fallback when not in an async context."""
if self._should_close_session():
try:
loop = asyncio.get_running_loop()
loop.create_task(self._session.close())
loop.create_task(self._session.close()) # type: ignore[union-attr]
except RuntimeError:
# No running event loop - can't safely close the session.
# The session will be garbage collected eventually, but warn about it.
logger.warning(
"RemoteRolloutProcessor.cleanup() called outside of async context. "
"Session may not be properly closed. Use `await processor.acleanup()` when possible."
Expand Down
Loading