@@ -208,25 +208,26 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
208208 tasks = [asyncio .create_task (_sem_wrapper (row )) for row in rows ]
209209 return tasks
210210
211+ def _should_close_session (self ) -> bool :
212+ self ._active_runs = max (0 , self ._active_runs - 1 )
213+ return self ._active_runs == 0 and self ._session is not None and not self ._session .closed
214+
211215 async def acleanup (self ) -> None :
212- """Async cleanup - only closes the session when the last run finishes.
216+ """Async cleanup — only closes the session when the last run finishes.
213217
214218 rollout_processor_with_retry calls acleanup() per-run, but the session
215219 is shared across parallel runs. Closing it early would cancel in-flight
216220 requests in other runs.
217221 """
218- self ._active_runs = max (0 , self ._active_runs - 1 )
219- if self ._active_runs == 0 and self ._session and not self ._session .closed :
220- await self ._session .close ()
222+ if self ._should_close_session ():
223+ await self ._session .close () # type: ignore[union-attr]
221224
222225 def cleanup (self ) -> None :
223- """Sync cleanup - best-effort, schedules close if event loop is running."""
224- if self ._active_runs > 0 :
225- return
226- if self ._session and not self ._session .closed :
226+ """Sync cleanup — best-effort fallback when not in an async context."""
227+ if self ._should_close_session ():
227228 try :
228229 loop = asyncio .get_running_loop ()
229- loop .create_task (self ._session .close ())
230+ loop .create_task (self ._session .close ()) # type: ignore[union-attr]
230231 except RuntimeError :
231232 logger .warning (
232233 "RemoteRolloutProcessor.cleanup() called outside of async context. "
0 commit comments