Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 29 additions & 50 deletions eval_protocol/adapters/fireworks_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ def get_evaluation_rows(
from_timestamp: Optional[datetime] = None,
to_timestamp: Optional[datetime] = None,
include_tool_calls: bool = True,
backend_sleep_between_gets: float = 0.1,
backend_max_retries: int = 3,
proxy_max_retries: int = 3,
sleep_between_gets: float = 0.1,
max_retries: int = 3,
span_name: Optional[str] = None,
converter: Optional[TraceDictConverter] = None,
) -> List[EvaluationRow]:
Expand All @@ -305,10 +304,8 @@ def get_evaluation_rows(
from_timestamp: Explicit start time (ISO format)
to_timestamp: Explicit end time (ISO format)
include_tool_calls: Whether to include tool calling traces
backend_sleep_between_gets: Sleep time between backend trace fetches (passed to proxy)
backend_max_retries: Maximum retries for backend operations (passed to proxy)
proxy_max_retries: Maximum retries when proxy returns 404 (client-side retries with exponential backoff)
span_name: If provided, extract messages from generations within this named span
sleep_between_gets: Sleep time between polling attempts (default: 2.5s)
max_retries: Max retry attempts used by proxy (default: 3)
converter: Optional custom converter implementing TraceDictConverter protocol.
If provided, this will be used instead of the default conversion logic.

Expand All @@ -318,9 +315,9 @@ def get_evaluation_rows(
Raises:
ValueError: If tags list is empty
"""
# Validate that tags are provided (security requirement)
# Validate that tags are provided
if not tags or len(tags) == 0:
raise ValueError("At least one tag is required to fetch traces (security: prevents fetching all traces)")
raise ValueError("At least one tag is required to fetch traces")

eval_rows = []

Expand All @@ -339,58 +336,40 @@ def get_evaluation_rows(
"hours_back": hours_back,
"from_timestamp": from_timestamp.isoformat() if from_timestamp else None,
"to_timestamp": to_timestamp.isoformat() if to_timestamp else None,
"sleep_between_gets": backend_sleep_between_gets,
"max_retries": backend_max_retries,
"sleep_between_gets": sleep_between_gets,
"max_retries": max_retries,
}

# Remove None values
params = {k: v for k, v in params.items() if v is not None}

# Make request to proxy with retry logic
# Make request to proxy
if self.project_id:
url = f"{self.base_url}/v1/project_id/{self.project_id}/traces"
else:
url = f"{self.base_url}/v1/traces"

# Retry loop for handling backend indexing delays (proxy returns 404)
result = None
for attempt in range(proxy_max_retries):
try:
response = requests.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
result = response.json()
break # Success, exit retry loop
except requests.exceptions.HTTPError as e:
error_msg = str(e)
should_retry = False

# Try to extract detail message from response
if e.response is not None:
try:
error_detail = e.response.json().get("detail", "")
error_msg = error_detail or e.response.text

# Retry on 404 if it's due to incomplete/missing traces (backend still indexing)
if e.response.status_code == 404:
should_retry = True
except Exception:
error_msg = e.response.text

if should_retry and attempt < proxy_max_retries - 1:
sleep_time = 2 ** (attempt + 1)
logger.warning(error_msg)
time.sleep(sleep_time)
else:
# Final retry or non-retryable error
logger.error("Failed to fetch traces from proxy: %s", error_msg)
return eval_rows
except requests.exceptions.RequestException as e:
# Non-HTTP errors (network issues, timeouts, etc.)
logger.error("Failed to fetch traces from proxy: %s", str(e))
return eval_rows

if result is None:
logger.error("Failed to fetch traces after %d retries", proxy_max_retries)
try:
response = requests.get(url, params=params, timeout=self.timeout)
response.raise_for_status()
result = response.json()
except requests.exceptions.HTTPError as e:
error_msg = str(e)

# Try to extract detail message from response
if e.response is not None:
try:
error_detail = e.response.json().get("detail", {})
error_msg = error_detail or e.response.text
except Exception: # In case e.response.json() fails
error_msg = f"Proxy error: {e.response.text}"

logger.error("Failed to fetch traces from proxy: %s", error_msg)
return eval_rows
except requests.exceptions.RequestException as e:
# Non-HTTP errors (network issues, timeouts, etc.)
logger.error("Failed to fetch traces from proxy: %s", str(e))
return eval_rows

# Extract traces from response
Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/pytest/remote_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _default_output_data_loader(config: DataLoaderConfig) -> DynamicDataLoader:
def fetch_traces() -> List[EvaluationRow]:
base_url = config.model_base_url or "https://tracing.fireworks.ai"
adapter = FireworksTracingAdapter(base_url=base_url)
return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], proxy_max_retries=5)
return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], max_retries=5)

return DynamicDataLoader(generators=[fetch_traces], preprocess_fn=filter_longest_conversation)

Expand Down
2 changes: 1 addition & 1 deletion tests/remote_server/test_remote_fireworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def fetch_fireworks_traces(config: DataLoaderConfig) -> List[EvaluationRow]:

base_url = config.model_base_url or "https://tracing.fireworks.ai"
adapter = FireworksTracingAdapter(base_url=base_url)
return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], proxy_max_retries=5)
return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], max_retries=5)


def fireworks_output_data_loader(config: DataLoaderConfig) -> DynamicDataLoader:
Expand Down
Loading