diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 791f83e2..af0bf30c 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -281,9 +281,8 @@ def get_evaluation_rows( from_timestamp: Optional[datetime] = None, to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, - backend_sleep_between_gets: float = 0.1, - backend_max_retries: int = 3, - proxy_max_retries: int = 3, + sleep_between_gets: float = 0.1, + max_retries: int = 3, span_name: Optional[str] = None, converter: Optional[TraceDictConverter] = None, ) -> List[EvaluationRow]: @@ -305,10 +304,8 @@ def get_evaluation_rows( from_timestamp: Explicit start time (ISO format) to_timestamp: Explicit end time (ISO format) include_tool_calls: Whether to include tool calling traces - backend_sleep_between_gets: Sleep time between backend trace fetches (passed to proxy) - backend_max_retries: Maximum retries for backend operations (passed to proxy) - proxy_max_retries: Maximum retries when proxy returns 404 (client-side retries with exponential backoff) - span_name: If provided, extract messages from generations within this named span + sleep_between_gets: Sleep time between polling attempts (default: 2.5s) + max_retries: Max retry attempts used by proxy (default: 3) converter: Optional custom converter implementing TraceDictConverter protocol. If provided, this will be used instead of the default conversion logic. @@ -318,9 +315,9 @@ def get_evaluation_rows( Raises: ValueError: If tags list is empty """ - # Validate that tags are provided (security requirement) + # Validate that tags are provided if not tags or len(tags) == 0: - raise ValueError("At least one tag is required to fetch traces (security: prevents fetching all traces)") + raise ValueError("At least one tag is required to fetch traces") eval_rows = [] @@ -339,58 +336,40 @@ def get_evaluation_rows( "hours_back": hours_back, "from_timestamp": from_timestamp.isoformat() if from_timestamp else None, "to_timestamp": to_timestamp.isoformat() if to_timestamp else None, - "sleep_between_gets": backend_sleep_between_gets, - "max_retries": backend_max_retries, + "sleep_between_gets": sleep_between_gets, + "max_retries": max_retries, } # Remove None values params = {k: v for k, v in params.items() if v is not None} - # Make request to proxy with retry logic + # Make request to proxy if self.project_id: url = f"{self.base_url}/v1/project_id/{self.project_id}/traces" else: url = f"{self.base_url}/v1/traces" - # Retry loop for handling backend indexing delays (proxy returns 404) result = None - for attempt in range(proxy_max_retries): - try: - response = requests.get(url, params=params, timeout=self.timeout) - response.raise_for_status() - result = response.json() - break # Success, exit retry loop - except requests.exceptions.HTTPError as e: - error_msg = str(e) - should_retry = False - - # Try to extract detail message from response - if e.response is not None: - try: - error_detail = e.response.json().get("detail", "") - error_msg = error_detail or e.response.text - - # Retry on 404 if it's due to incomplete/missing traces (backend still indexing) - if e.response.status_code == 404: - should_retry = True - except Exception: - error_msg = e.response.text - - if should_retry and attempt < proxy_max_retries - 1: - sleep_time = 2 ** (attempt + 1) - logger.warning(error_msg) - time.sleep(sleep_time) - else: - # Final retry or non-retryable error - logger.error("Failed to fetch traces from proxy: %s", error_msg) - return eval_rows - except requests.exceptions.RequestException as e: - # Non-HTTP errors (network issues, timeouts, etc.) - logger.error("Failed to fetch traces from proxy: %s", str(e)) - return eval_rows - - if result is None: - logger.error("Failed to fetch traces after %d retries", proxy_max_retries) + try: + response = requests.get(url, params=params, timeout=self.timeout) + response.raise_for_status() + result = response.json() + except requests.exceptions.HTTPError as e: + error_msg = str(e) + + # Try to extract detail message from response + if e.response is not None: + try: + error_detail = e.response.json().get("detail", {}) + error_msg = error_detail or e.response.text + except Exception: # In case e.response.json() fails + error_msg = f"Proxy error: {e.response.text}" + + logger.error("Failed to fetch traces from proxy: %s", error_msg) + return eval_rows + except requests.exceptions.RequestException as e: + # Non-HTTP errors (network issues, timeouts, etc.) + logger.error("Failed to fetch traces from proxy: %s", str(e)) return eval_rows # Extract traces from response diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 5031f6fe..2c67e307 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -70,7 +70,7 @@ def _default_output_data_loader(config: DataLoaderConfig) -> DynamicDataLoader: def fetch_traces() -> List[EvaluationRow]: base_url = config.model_base_url or "https://tracing.fireworks.ai" adapter = FireworksTracingAdapter(base_url=base_url) - return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], proxy_max_retries=5) + return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], max_retries=5) return DynamicDataLoader(generators=[fetch_traces], preprocess_fn=filter_longest_conversation) diff --git a/tests/remote_server/test_remote_fireworks.py b/tests/remote_server/test_remote_fireworks.py index a6a66ba6..3050b1f5 100644 --- a/tests/remote_server/test_remote_fireworks.py +++ b/tests/remote_server/test_remote_fireworks.py @@ -43,7 +43,7 @@ def fetch_fireworks_traces(config: DataLoaderConfig) -> List[EvaluationRow]: base_url = config.model_base_url or "https://tracing.fireworks.ai" adapter = FireworksTracingAdapter(base_url=base_url) - return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], proxy_max_retries=5) + return adapter.get_evaluation_rows(tags=[f"rollout_id:{config.rollout_id}"], max_retries=5) def fireworks_output_data_loader(config: DataLoaderConfig) -> DynamicDataLoader: