Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions eval_protocol/adapters/fireworks_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ def __init__(
self.project_id = project_id
self.base_url = base_url.rstrip("/")
self.timeout = timeout
# Reuse a single session for connection pooling and to avoid leaking FDs.
self._session = requests.Session()

def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
"""Fetch logs from Fireworks tracing gateway /logs endpoint.
Expand All @@ -287,14 +289,14 @@ def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -
last_error: Optional[str] = None
for url in urls_to_try:
try:
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
if response.status_code == 404:
# Try next variant
last_error = f"404 for {url}"
continue
response.raise_for_status()
data = response.json() or {}
break
with self._session.get(url, params=params, timeout=self.timeout, headers=headers) as response:
if response.status_code == 404:
# Try next variant (must close response to release connection)
last_error = f"404 for {url}"
continue
response.raise_for_status()
data = response.json() or {}
break
except requests.exceptions.RequestException as e:
last_error = str(e)
continue
Expand Down Expand Up @@ -412,9 +414,9 @@ def get_evaluation_rows(

result = None
try:
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
response.raise_for_status()
result = response.json()
with self._session.get(url, params=params, timeout=self.timeout, headers=headers) as response:
response.raise_for_status()
result = response.json()
Comment thread
xzrderek marked this conversation as resolved.
except requests.exceptions.HTTPError as e:
error_msg = str(e)

Expand Down Expand Up @@ -451,3 +453,10 @@ def get_evaluation_rows(

logger.info("Successfully converted %d traces to evaluation rows", len(eval_rows))
return eval_rows

def close(self) -> None:
"""Close underlying HTTP resources."""
try:
self._session.close()
except Exception:
pass
16 changes: 11 additions & 5 deletions eval_protocol/pytest/remote_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(
self._output_data_loader = output_data_loader or default_fireworks_output_data_loader
self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url)

self._session = requests.Session()

def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]:
tasks: List[asyncio.Task[EvaluationRow]] = []

Expand Down Expand Up @@ -94,8 +96,8 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
def _post_init() -> None:
url = f"{remote_base_url}/init"
try:
r = requests.post(url, json=init_payload.model_dump(), timeout=300)
r.raise_for_status()
with self._session.post(url, json=init_payload.model_dump(), timeout=300) as r:
r.raise_for_status()
except requests.exceptions.Timeout:
raise TimeoutError(
f"The /init endpoint tried {url} with {init_payload.model_dump()} but timed out after 300 seconds."
Expand All @@ -108,9 +110,9 @@ def _post_init() -> None:

def _get_status() -> Dict[str, Any]:
url = f"{remote_base_url}/status"
r = requests.get(url, params={"rollout_id": row.execution_metadata.rollout_id}, timeout=15)
r.raise_for_status()
return r.json()
with self._session.get(url, params={"rollout_id": row.execution_metadata.rollout_id}, timeout=15) as r:
r.raise_for_status()
return r.json()

continue_polling_status = True
while time.time() < deadline:
Expand Down Expand Up @@ -204,4 +206,8 @@ async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
return tasks

def cleanup(self) -> None:
try:
self._session.close()
except Exception:
pass
Comment thread
xzrderek marked this conversation as resolved.
return None
Loading