From bbd8d24e3866c3e87556446f1d0b58271ba18e33 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 13:49:29 -0700 Subject: [PATCH 1/6] retry on connection errors --- eval_protocol/pytest/exception_config.py | 4 ++++ eval_protocol/pytest/remote_rollout_processor.py | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/eval_protocol/pytest/exception_config.py b/eval_protocol/pytest/exception_config.py index 6511252b..abb9bb58 100644 --- a/eval_protocol/pytest/exception_config.py +++ b/eval_protocol/pytest/exception_config.py @@ -23,6 +23,7 @@ def get_default_retryable_exceptions() -> Set[Type[Exception]]: return _default_retryable_exceptions # Lazy imports (these are expensive) + import aiohttp import httpx import litellm import requests @@ -32,6 +33,9 @@ def get_default_retryable_exceptions() -> Set[Type[Exception]]: ConnectionError, # type: ignore[assignment] TimeoutError, # type: ignore[assignment] OSError, # type: ignore[assignment] # Covers network-related OS errors + # aiohttp library exceptions + aiohttp.ClientConnectionError, + aiohttp.ServerDisconnectedError, # Requests library exceptions requests.exceptions.ConnectionError, requests.exceptions.Timeout, diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index f2abca78..709a8bc8 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -104,11 +104,25 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: try: session = self._get_or_create_session() async with session.post(init_url, json=init_payload.model_dump(), timeout=timeout_init) as resp: + if resp.status >= 500: + body = await resp.text() + raise ConnectionError(f"Remote /init returned server error (HTTP {resp.status}): {body}") if resp.status >= 400: body = await resp.text() raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}") resp.raise_for_status() await resp.read() # Drain the response body and release the connection back to the pool + except asyncio.CancelledError: + # Distinguish intentional cancellation (Ctrl+C, test teardown) from + # aiohttp-internal cancellation caused by a poisoned DNS resolver + # after a server disconnect. + current = asyncio.current_task() + if current is not None and current.cancelled(): + raise # Intentional cancellation — propagate immediately + # Network-level failure; discard the session so retries get a + # fresh connection pool. + self._session = None + raise ConnectionError("Remote server connection lost (request cancelled)") except asyncio.TimeoutError: raise TimeoutError( f"The /init endpoint tried {init_url} with {init_payload.model_dump()} but timed out after 300 seconds." From 4240f6485c181aa75de95cd53955962813d73e4e Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 13:55:09 -0700 Subject: [PATCH 2/6] try --- eval_protocol/pytest/remote_rollout_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 709a8bc8..1837ac44 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -115,9 +115,11 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: except asyncio.CancelledError: # Distinguish intentional cancellation (Ctrl+C, test teardown) from # aiohttp-internal cancellation caused by a poisoned DNS resolver - # after a server disconnect. + # after a server disconnect. Task.cancelling() returns the number + # of pending cancel() calls; > 0 means someone explicitly cancelled + # this task. current = asyncio.current_task() - if current is not None and current.cancelled(): + if current is not None and current.cancelling() > 0: # pyright: ignore[reportAttributeAccessIssue] raise # Intentional cancellation — propagate immediately # Network-level failure; discard the session so retries get a # fresh connection pool. From 521e7561c95b8db0a57ddeaaf30d6890f1d03089 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 15:17:49 -0700 Subject: [PATCH 3/6] fix --- .../pytest/remote_rollout_processor.py | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 1837ac44..6a4d5f48 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -49,6 +49,7 @@ def __init__( self._timeout_seconds = timeout_seconds self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) self._session: Optional[aiohttp.ClientSession] = None + self._active_runs = 0 def _get_or_create_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: @@ -56,6 +57,7 @@ def _get_or_create_session(self) -> aiohttp.ClientSession: return self._session def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: + self._active_runs += 1 tasks: List[asyncio.Task[EvaluationRow]] = [] # Start with constructor values @@ -112,19 +114,6 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: raise RuntimeError(f"Remote /init failed (HTTP {resp.status}): {body}") resp.raise_for_status() await resp.read() # Drain the response body and release the connection back to the pool - except asyncio.CancelledError: - # Distinguish intentional cancellation (Ctrl+C, test teardown) from - # aiohttp-internal cancellation caused by a poisoned DNS resolver - # after a server disconnect. Task.cancelling() returns the number - # of pending cancel() calls; > 0 means someone explicitly cancelled - # this task. - current = asyncio.current_task() - if current is not None and current.cancelling() > 0: # pyright: ignore[reportAttributeAccessIssue] - raise # Intentional cancellation — propagate immediately - # Network-level failure; discard the session so retries get a - # fresh connection pool. - self._session = None - raise ConnectionError("Remote server connection lost (request cancelled)") except asyncio.TimeoutError: raise TimeoutError( f"The /init endpoint tried {init_url} with {init_payload.model_dump()} but timed out after 300 seconds." @@ -220,19 +209,25 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: return tasks async def acleanup(self) -> None: - """Async cleanup - preferred when you can await.""" - if self._session and not self._session.closed: + """Async cleanup - only closes the session when the last run finishes. + + rollout_processor_with_retry calls acleanup() per-run, but the session + is shared across parallel runs. Closing it early would cancel in-flight + requests in other runs. + """ + self._active_runs = max(0, self._active_runs - 1) + if self._active_runs == 0 and self._session and not self._session.closed: await self._session.close() def cleanup(self) -> None: """Sync cleanup - best-effort, schedules close if event loop is running.""" + if self._active_runs > 0: + return if self._session and not self._session.closed: try: loop = asyncio.get_running_loop() loop.create_task(self._session.close()) except RuntimeError: - # No running event loop - can't safely close the session. - # The session will be garbage collected eventually, but warn about it. logger.warning( "RemoteRolloutProcessor.cleanup() called outside of async context. " "Session may not be properly closed. Use `await processor.acleanup()` when possible." From 0155e776c045a259a388aa47959e751c8d57fcb0 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 15:19:17 -0700 Subject: [PATCH 4/6] update --- .../pytest/remote_rollout_processor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 6a4d5f48..b9a558d9 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -208,25 +208,26 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: tasks = [asyncio.create_task(_sem_wrapper(row)) for row in rows] return tasks + def _should_close_session(self) -> bool: + self._active_runs = max(0, self._active_runs - 1) + return self._active_runs == 0 and self._session is not None and not self._session.closed + async def acleanup(self) -> None: - """Async cleanup - only closes the session when the last run finishes. + """Async cleanup — only closes the session when the last run finishes. rollout_processor_with_retry calls acleanup() per-run, but the session is shared across parallel runs. Closing it early would cancel in-flight requests in other runs. """ - self._active_runs = max(0, self._active_runs - 1) - if self._active_runs == 0 and self._session and not self._session.closed: - await self._session.close() + if self._should_close_session(): + await self._session.close() # type: ignore[union-attr] def cleanup(self) -> None: - """Sync cleanup - best-effort, schedules close if event loop is running.""" - if self._active_runs > 0: - return - if self._session and not self._session.closed: + """Sync cleanup — best-effort fallback when not in an async context.""" + if self._should_close_session(): try: loop = asyncio.get_running_loop() - loop.create_task(self._session.close()) + loop.create_task(self._session.close()) # type: ignore[union-attr] except RuntimeError: logger.warning( "RemoteRolloutProcessor.cleanup() called outside of async context. " From c9e35905019b9e01612c31f26f2d22f705076057 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 15:30:32 -0700 Subject: [PATCH 5/6] attempted fix --- eval_protocol/pytest/evaluation_test.py | 24 ++++++++++++------- eval_protocol/pytest/evaluation_test_utils.py | 8 +++++-- .../pytest/remote_rollout_processor.py | 23 +++++------------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 2256b35f..d2a44fde 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -449,6 +449,8 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo finally: if output_buffer: await output_buffer.close() + await rollout_processor.acleanup() + rollout_processor.cleanup() for res in priority_results: run_idx = (res.execution_metadata.extra or {}).get("run_index", 0) @@ -697,15 +699,19 @@ async def _collect_result(config, lst): # Lazy import (cached after first import above) from eval_protocol.pytest.default_mcp_gym_rollout_processor import MCPGymRolloutProcessor - if isinstance(rollout_processor, MCPGymRolloutProcessor): - # For MCPGymRolloutProcessor, create and execute tasks one at a time to avoid port conflicts - for run_idx in range(num_runs): - task = asyncio.create_task(execute_run(run_idx, config)) - await task - else: - # For other processors, create all tasks at once and run in parallel - # Concurrency is now controlled by the shared semaphore in each rollout processor - await run_tasks_with_run_progress(execute_run, num_runs, config) + try: + if isinstance(rollout_processor, MCPGymRolloutProcessor): + # For MCPGymRolloutProcessor, create and execute tasks one at a time to avoid port conflicts + for run_idx in range(num_runs): + task = asyncio.create_task(execute_run(run_idx, config)) + await task + else: + # For other processors, create all tasks at once and run in parallel + # Concurrency is now controlled by the shared semaphore in each rollout processor + await run_tasks_with_run_progress(execute_run, num_runs, config) + finally: + await rollout_processor.acleanup() + rollout_processor.cleanup() experiment_duration_seconds = time.perf_counter() - experiment_start_time diff --git a/eval_protocol/pytest/evaluation_test_utils.py b/eval_protocol/pytest/evaluation_test_utils.py index 5ab80364..fafce6b5 100644 --- a/eval_protocol/pytest/evaluation_test_utils.py +++ b/eval_protocol/pytest/evaluation_test_utils.py @@ -476,8 +476,12 @@ async def execute_row_with_backoff_and_log( yield result finally: - await rollout_processor.acleanup() - rollout_processor.cleanup() + # Cleanup is intentionally NOT called here. rollout_processor_with_retry + # is invoked per-run, but the processor (and its session) is shared + # across parallel runs. Closing per-run would kill in-flight requests + # in other runs. Cleanup is called once after all runs complete in + # evaluation_test.py. + pass def sanitize_filename(text: str) -> str: diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index b9a558d9..bcd37685 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -49,7 +49,6 @@ def __init__( self._timeout_seconds = timeout_seconds self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) self._session: Optional[aiohttp.ClientSession] = None - self._active_runs = 0 def _get_or_create_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: @@ -57,7 +56,6 @@ def _get_or_create_session(self) -> aiohttp.ClientSession: return self._session def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: - self._active_runs += 1 tasks: List[asyncio.Task[EvaluationRow]] = [] # Start with constructor values @@ -208,26 +206,17 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: tasks = [asyncio.create_task(_sem_wrapper(row)) for row in rows] return tasks - def _should_close_session(self) -> bool: - self._active_runs = max(0, self._active_runs - 1) - return self._active_runs == 0 and self._session is not None and not self._session.closed - async def acleanup(self) -> None: - """Async cleanup — only closes the session when the last run finishes. - - rollout_processor_with_retry calls acleanup() per-run, but the session - is shared across parallel runs. Closing it early would cancel in-flight - requests in other runs. - """ - if self._should_close_session(): - await self._session.close() # type: ignore[union-attr] + """Async cleanup - preferred when you can await.""" + if self._session and not self._session.closed: + await self._session.close() def cleanup(self) -> None: - """Sync cleanup — best-effort fallback when not in an async context.""" - if self._should_close_session(): + """Sync cleanup - best-effort, schedules close if event loop is running.""" + if self._session and not self._session.closed: try: loop = asyncio.get_running_loop() - loop.create_task(self._session.close()) # type: ignore[union-attr] + loop.create_task(self._session.close()) except RuntimeError: logger.warning( "RemoteRolloutProcessor.cleanup() called outside of async context. " From a11c7ddb0da193ffc01d42b2fcd3a6de6cee67f7 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 13 Mar 2026 15:49:33 -0700 Subject: [PATCH 6/6] update --- tests/pytest/test_utils.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/pytest/test_utils.py b/tests/pytest/test_utils.py index 09378fb7..7e2a0da4 100644 --- a/tests/pytest/test_utils.py +++ b/tests/pytest/test_utils.py @@ -72,9 +72,6 @@ async def mock_task(): assert mock_config.logger.log.call_count == 1 mock_config.logger.log.assert_called_once_with(results[0]) - # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.acleanup.assert_awaited_once() - @pytest.mark.asyncio async def test_logger_called_on_failed_execution(self, mock_rollout_processor, mock_config, sample_dataset): """Test that the logger is called when execution fails.""" @@ -98,9 +95,6 @@ async def failing_task(): assert results[0].rollout_status.code == 13 # INTERNAL error code assert "Test error" in results[0].rollout_status.message - # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.acleanup.assert_awaited_once() - @pytest.mark.asyncio async def test_logger_called_on_retry_execution(self, mock_rollout_processor, mock_config, sample_dataset): """Test that the logger is called when execution succeeds after retry.""" @@ -135,9 +129,6 @@ async def flaky_task(): assert mock_config.logger.log.call_count == 1 mock_config.logger.log.assert_called_once_with(results[0]) - # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.acleanup.assert_awaited_once() - @pytest.mark.asyncio async def test_logger_called_for_multiple_rows(self, mock_rollout_processor, mock_config): """Test that the logger is called for each row in a multi-row dataset.""" @@ -183,9 +174,6 @@ async def mock_task(): assert mock_config.logger.log.call_count == 2 assert len(results) == 2 - # Verify async cleanup was called (aclose is preferred over cleanup) - mock_rollout_processor.acleanup.assert_awaited_once() - @pytest.mark.asyncio async def test_logger_called_even_when_processor_fails_to_initialize( self, mock_rollout_processor, mock_config, sample_dataset @@ -198,6 +186,3 @@ async def test_logger_called_even_when_processor_fails_to_initialize( with pytest.raises(RuntimeError, match="Processor failed to initialize"): async for result in rollout_processor_with_retry(mock_rollout_processor, sample_dataset, mock_config): pass - - # Verify async cleanup was called even though the function failed - mock_rollout_processor.acleanup.assert_awaited_once()