From 26cb51e5b729a2b414a77b2e9e108824df3b8eef Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 00:56:24 -0700 Subject: [PATCH 1/7] Deprecate Elastic search logs for Fireworks Tracing logs --- eval_protocol/__init__.py | 2 - eval_protocol/adapters/fireworks_tracing.py | 1 + eval_protocol/cli_commands/logs.py | 43 --------- .../fireworks_tracing_http_handler.py | 48 ++++++---- .../pytest/remote_rollout_processor.py | 94 ++++++------------- eval_protocol/pytest/tracing_utils.py | 2 - tests/remote_server/remote_server.py | 13 +-- .../remote_server/remote_server_multi_turn.py | 13 +-- 8 files changed, 71 insertions(+), 145 deletions(-) diff --git a/eval_protocol/__init__.py b/eval_protocol/__init__.py index 768c7e9b..e0ca05cb 100644 --- a/eval_protocol/__init__.py +++ b/eval_protocol/__init__.py @@ -36,7 +36,6 @@ filter_longest_conversation, ) from .pytest import evaluation_test, SingleTurnRolloutProcessor, RemoteRolloutProcessor, GithubActionRolloutProcessor -from .pytest.remote_rollout_processor import create_elasticsearch_config_from_env from .pytest.parameterize import DefaultParameterIdGenerator from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler from .log_utils.rollout_id_filter import RolloutIdFilter @@ -90,7 +89,6 @@ warnings.filterwarnings("default", category=DeprecationWarning, module="eval_protocol") __all__ = [ - "create_elasticsearch_config_from_env", "ElasticsearchConfig", "ElasticsearchDirectHttpHandler", "RolloutIdFilter", diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 7a2bddcb..233371cc 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -309,6 +309,7 @@ def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) - "message": e.get("message"), "severity": e.get("severity", "INFO"), "tags": e.get("tags", []), + "status": e.get("status"), } ) return results diff --git a/eval_protocol/cli_commands/logs.py b/eval_protocol/cli_commands/logs.py index 43261d94..89929a82 100644 --- a/eval_protocol/cli_commands/logs.py +++ b/eval_protocol/cli_commands/logs.py @@ -39,49 +39,6 @@ def logs_command(args): or os.environ.get("GATEWAY_URL") or "https://tracing.fireworks.ai" ) - try: - if not use_fireworks: - if getattr(args, "use_env_elasticsearch_config", False): - # Use environment variables for configuration - print("âš™ī¸ Using environment variables for Elasticsearch config") - from eval_protocol.pytest.remote_rollout_processor import ( - create_elasticsearch_config_from_env, - ) - - elasticsearch_config = create_elasticsearch_config_from_env() - # Ensure index exists with correct mapping, mirroring Docker setup path - try: - from eval_protocol.log_utils.elasticsearch_index_manager import ( - ElasticsearchIndexManager, - ) - - index_manager = ElasticsearchIndexManager( - elasticsearch_config.url, - elasticsearch_config.index_name, - elasticsearch_config.api_key, - ) - created = index_manager.create_logging_index_mapping() - if created: - print( - f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)" - ) - else: - print( - f"âš ī¸ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly." - ) - except Exception as e: - print(f"âš ī¸ Failed to ensure index mapping via IndexManager: {e}") - elif not getattr(args, "disable_elasticsearch_setup", False): - # Default behavior: start or connect to local Elasticsearch via Docker helper - from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup - - print("🧰 Auto-configuring local Elasticsearch (Docker)") - elasticsearch_config = ElasticsearchSetup().setup_elasticsearch() - else: - print("đŸšĢ Elasticsearch setup disabled; running without Elasticsearch integration") - except Exception as e: - print(f"❌ Failed to configure Elasticsearch: {e}") - return 1 try: serve_logs( diff --git a/eval_protocol/log_utils/fireworks_tracing_http_handler.py b/eval_protocol/log_utils/fireworks_tracing_http_handler.py index b553a4bd..b04b2692 100644 --- a/eval_protocol/log_utils/fireworks_tracing_http_handler.py +++ b/eval_protocol/log_utils/fireworks_tracing_http_handler.py @@ -79,6 +79,34 @@ def _get_rollout_id(self, record: logging.LogRecord) -> Optional[str]: return str(cast(Any, getattr(record, "rollout_id"))) return os.getenv(self.rollout_id_env) + def _get_status_info(self, record: logging.LogRecord) -> Optional[Dict[str, Any]]: + """Extract status information from the log record's extra data.""" + # Check if 'status' is in the extra data (passed via extra parameter) + if hasattr(record, "status") and record.status is not None: # type: ignore + status = record.status # type: ignore + + # Handle Status class instances (Pydantic BaseModel) + if hasattr(status, "code") and hasattr(status, "message"): + # Status object - extract code and message + status_code = status.code + # Handle both enum values and direct integer values + if hasattr(status_code, "value"): + status_code = status_code.value + + return { + "status_code": status_code, + "status_message": status.message, + "status_details": getattr(status, "details", []), + } + elif isinstance(status, dict): + # Dictionary representation of status + return { + "status_code": status.get("code"), + "status_message": status.get("message"), + "status_details": status.get("details", []), + } + return None + def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str, Any]: timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") message = record.getMessage() @@ -96,28 +124,12 @@ def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str except Exception: pass program = cast(Optional[str], getattr(record, "program", None)) or "eval_protocol" - status_val = cast(Any, getattr(record, "status", None)) - status = status_val if isinstance(status_val, str) else None - # Capture optional structured status fields if present - metadata: Dict[str, Any] = {} - status_code = cast(Any, getattr(record, "status_code", None)) - if isinstance(status_code, int): - metadata["status_code"] = status_code - status_message = cast(Any, getattr(record, "status_message", None)) - if isinstance(status_message, str): - metadata["status_message"] = status_message - status_details = getattr(record, "status_details", None) - if status_details is not None: - metadata["status_details"] = status_details - extra_metadata = cast(Any, getattr(record, "metadata", None)) - if isinstance(extra_metadata, dict): - metadata.update(extra_metadata) + return { "program": program, - "status": status, + "status": self._get_status_info(record), "message": message, "tags": tags, - "metadata": metadata or None, "extras": { "logger_name": record.name, "level": record.levelname, diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 0db806c7..4b2653a3 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -4,16 +4,15 @@ import requests -from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient from eval_protocol.models import EvaluationRow, Status from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader from eval_protocol.types.remote_rollout_processor import ( DataLoaderConfig, - ElasticsearchConfig, ) +from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter + from .rollout_processor import RolloutProcessor from .types import RolloutProcessorConfig -from .elasticsearch_setup import ElasticsearchSetup from .tracing_utils import default_fireworks_output_data_loader, build_init_request, update_row_with_remote_trace import logging @@ -22,25 +21,6 @@ logger = logging.getLogger(__name__) -def create_elasticsearch_config_from_env() -> ElasticsearchConfig: - """Setup Elasticsearch config from environment variables.""" - url = os.getenv("ELASTICSEARCH_URL") - api_key = os.getenv("ELASTICSEARCH_API_KEY") - index_name = os.getenv("ELASTICSEARCH_INDEX_NAME") - - if url is None: - raise ValueError("ELASTICSEARCH_URL must be set") - if api_key is None: - raise ValueError("ELASTICSEARCH_API_KEY must be set") - if index_name is None: - raise ValueError("ELASTICSEARCH_INDEX_NAME must be set") - return ElasticsearchConfig( - url=url, - api_key=api_key, - index_name=index_name, - ) - - class RemoteRolloutProcessor(RolloutProcessor): """ Rollout processor that triggers a remote HTTP server to perform the rollout. @@ -59,8 +39,6 @@ def __init__( poll_interval: float = 1.0, timeout_seconds: float = 120.0, output_data_loader: Optional[Callable[[DataLoaderConfig], DynamicDataLoader]] = None, - disable_elastic_search_setup: bool = False, - elastic_search_config: Optional[ElasticsearchConfig] = None, ): # Prefer constructor-provided configuration. These can be overridden via # config.kwargs at call time for backward compatibility. @@ -74,21 +52,7 @@ def __init__( self._poll_interval = poll_interval self._timeout_seconds = timeout_seconds self._output_data_loader = output_data_loader or default_fireworks_output_data_loader - self._disable_elastic_search_setup = disable_elastic_search_setup - self._elastic_search_config = elastic_search_config - - def setup(self) -> None: - if self._disable_elastic_search_setup: - logger.info("Elasticsearch is disabled, skipping setup") - return - logger.info("Setting up Elasticsearch") - self._elastic_search_config = self._setup_elastic_search() - logger.info("Elasticsearch setup complete") - - def _setup_elastic_search(self) -> ElasticsearchConfig: - """Set up Elasticsearch using the dedicated setup module.""" - setup = ElasticsearchSetup() - return setup.setup_elasticsearch() + self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -123,7 +87,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: if row.input_metadata.row_id is None: raise ValueError("Row ID is required in RemoteRolloutProcessor") - init_payload = build_init_request(row, config, model_base_url, self._elastic_search_config) + init_payload = build_init_request(row, config, model_base_url) # Fire-and-poll def _post_init() -> None: @@ -153,10 +117,6 @@ def _get_status() -> Dict[str, Any]: r.raise_for_status() return r.json() - elasticsearch_client = ( - ElasticsearchClient(self._elastic_search_config) if self._elastic_search_config else None - ) - continue_polling_status = True while time.time() < deadline: try: @@ -178,30 +138,36 @@ def _get_status() -> Dict[str, Any]: # For all other exceptions, raise them raise - if not elasticsearch_client: - continue - - search_results = elasticsearch_client.search_by_status_code_not_in( - row.execution_metadata.rollout_id, [Status.Code.RUNNING] + # Search Fireworks tracing logs for completion + completed_logs = self._tracing_adapter.search_logs( + tags=[f"rollout_id:{row.execution_metadata.rollout_id}"] ) - hits = search_results["hits"]["hits"] if search_results else [] + if completed_logs: + latest_log = completed_logs[0] + + logger.info( + f"Found completion log for rollout {row.execution_metadata.rollout_id}: {latest_log.get('message', '')}" + ) + + # Look for structured status dictionary in status field + status_dict = latest_log.get("status") + if status_dict and isinstance(status_dict, dict) and "status_code" in status_dict: + status_code = status_dict.get("status_code") + status_message = status_dict.get("status_message", "") + status_details = status_dict.get("status_details", []) - if hits: - # log all statuses found and update rollout status from the last hit - for hit in hits: - document = hit["_source"] logger.info( - f"Found log for rollout {row.execution_metadata.rollout_id} with status code {document['status_code']}" + f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" ) - # Update rollout status from the document - if "status_code" in document: - row.rollout_status = Status( - code=Status.Code(document["status_code"]), - message=document.get("status_message", ""), - details=document.get("status_details", []), - ) - logger.info("Stopping status polling for rollout %s", row.execution_metadata.rollout_id) - break + + row.rollout_status = Status( + code=Status.Code(status_code), + message=status_message, + details=status_details, + ) + + logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) + break await asyncio.sleep(poll_interval) else: diff --git a/eval_protocol/pytest/tracing_utils.py b/eval_protocol/pytest/tracing_utils.py index 14ac03cf..de05ff79 100644 --- a/eval_protocol/pytest/tracing_utils.py +++ b/eval_protocol/pytest/tracing_utils.py @@ -56,7 +56,6 @@ def build_init_request( row: EvaluationRow, config: RolloutProcessorConfig, model_base_url: str, - elastic_search_config: Optional[Any] = None, ) -> InitRequest: """Build an InitRequest from an EvaluationRow and config (shared logic).""" # Validation @@ -129,7 +128,6 @@ def build_init_request( tools=row.tools, metadata=meta, model_base_url=final_model_base_url, - elastic_search_config=elastic_search_config, ) diff --git a/tests/remote_server/remote_server.py b/tests/remote_server/remote_server.py index ed1ee08d..85d69ed5 100644 --- a/tests/remote_server/remote_server.py +++ b/tests/remote_server/remote_server.py @@ -8,14 +8,14 @@ from openai import OpenAI import logging -from eval_protocol import Status, InitRequest, ElasticsearchDirectHttpHandler, RolloutIdFilter +from eval_protocol import Status, InitRequest, FireworksTracingHttpHandler, RolloutIdFilter app = FastAPI() -# attach handler to root logger -handler = ElasticsearchDirectHttpHandler() -logging.getLogger().addHandler(handler) +# Attach Fireworks tracing handler to root logger +fireworks_handler = FireworksTracingHttpHandler() +logging.getLogger().addHandler(fireworks_handler) force_early_error_message = None @@ -23,10 +23,7 @@ @app.post("/init") def init(req: InitRequest): - if req.elastic_search_config: - handler.configure(req.elastic_search_config) - - # attach rollout_id filter to logger + # Attach rollout_id filter to logger logger = logging.getLogger(f"{__name__}.{req.metadata.rollout_id}") logger.addFilter(RolloutIdFilter(req.metadata.rollout_id)) diff --git a/tests/remote_server/remote_server_multi_turn.py b/tests/remote_server/remote_server_multi_turn.py index 69dabc15..3a9dc60d 100644 --- a/tests/remote_server/remote_server_multi_turn.py +++ b/tests/remote_server/remote_server_multi_turn.py @@ -7,22 +7,19 @@ from openai import OpenAI import logging -from eval_protocol import Status, InitRequest, ElasticsearchDirectHttpHandler, RolloutIdFilter +from eval_protocol import Status, InitRequest, FireworksTracingHttpHandler, RolloutIdFilter app = FastAPI() -# attach handler to root logger -handler = ElasticsearchDirectHttpHandler() -logging.getLogger().addHandler(handler) +# Attach Fireworks tracing handler to root logger +fireworks_handler = FireworksTracingHttpHandler() +logging.getLogger().addHandler(fireworks_handler) @app.post("/init") def init(req: InitRequest): - if req.elastic_search_config: - handler.configure(req.elastic_search_config) - - # attach rollout_id filter to logger + # Attach rollout_id filter to logger logger = logging.getLogger(f"{__name__}.{req.metadata.rollout_id}") logger.addFilter(RolloutIdFilter(req.metadata.rollout_id)) From 38f733b09b9fedf5747a77376890af206d0844e1 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 01:40:53 -0700 Subject: [PATCH 2/7] fix tests --- .github/workflows/elasticsearch-tests.yml | 53 +- .github/workflows/fireworks-tracing-tests.yml | 99 +++ .../test_elasticsearch_direct_http_handler.py | 20 + .../test_fireworks_tracing_integration.py | 624 ++++++++++++++++++ 4 files changed, 778 insertions(+), 18 deletions(-) create mode 100644 .github/workflows/fireworks-tracing-tests.yml create mode 100644 tests/logging/test_fireworks_tracing_integration.py diff --git a/.github/workflows/elasticsearch-tests.yml b/.github/workflows/elasticsearch-tests.yml index 9eba33a8..333bc174 100644 --- a/.github/workflows/elasticsearch-tests.yml +++ b/.github/workflows/elasticsearch-tests.yml @@ -1,20 +1,25 @@ -name: Elasticsearch Tests +# DEPRECATED: This workflow is deprecated in favor of fireworks-tracing-tests.yml +# The Elasticsearch integration tests have been replaced with Fireworks tracing tests +# This workflow is kept for backward compatibility but should be removed in the future + +name: Elasticsearch Tests (Deprecated) on: - push: - branches: [main] - paths-ignore: - - "docs/**" - - "*.md" - pull_request: # Run on all pull requests - paths-ignore: - - "docs/**" - - "*.md" - workflow_dispatch: # Allow manual triggering + # Disabled automatic triggers - only manual for backward compatibility + # push: + # branches: [main] + # paths-ignore: + # - "docs/**" + # - "*.md" + # pull_request: # Run on all pull requests + # paths-ignore: + # - "docs/**" + # - "*.md" + workflow_dispatch: # Allow manual triggering only jobs: elasticsearch-tests: - name: Elasticsearch Integration Tests + name: Elasticsearch Integration Tests (Deprecated) runs-on: ubuntu-latest steps: @@ -36,14 +41,26 @@ jobs: - name: Install the project run: uv sync --locked --all-extras --dev - - name: Run Elasticsearch Tests + - name: Show Deprecation Notice + run: | + echo "âš ī¸ DEPRECATION NOTICE âš ī¸" + echo "This workflow is deprecated. Use fireworks-tracing-tests.yml instead." + echo "Elasticsearch integration tests have been replaced with Fireworks tracing tests." + echo "" + echo "📋 New test file: tests/logging/test_fireworks_tracing_integration.py" + echo "🔄 New workflow: .github/workflows/fireworks-tracing-tests.yml" + + - name: Run Deprecated Elasticsearch Tests env: FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" run: | - # Run Elasticsearch direct HTTP handler tests - uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short + echo "Running deprecated Elasticsearch tests (these are skipped by default)..." + + # Try to run the tests, but they should be skipped + uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short || true - # Run RemoteRolloutProcessor Propagate Status Smoke Test (also uses Elasticsearch) - uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \ - -v --tb=short + echo "" + echo "â„šī¸ Note: These tests are deprecated and skipped by default." + echo "Use the new Fireworks tracing tests instead:" + echo "uv run pytest tests/logging/test_fireworks_tracing_integration.py -v" diff --git a/.github/workflows/fireworks-tracing-tests.yml b/.github/workflows/fireworks-tracing-tests.yml new file mode 100644 index 00000000..31bf15d7 --- /dev/null +++ b/.github/workflows/fireworks-tracing-tests.yml @@ -0,0 +1,99 @@ +name: Fireworks Tracing Tests + +on: + push: + branches: [main] + paths-ignore: + - "docs/**" + - "*.md" + pull_request: # Run on all pull requests + paths-ignore: + - "docs/**" + - "*.md" + workflow_dispatch: # Allow manual triggering + +jobs: + fireworks-tracing-tests: + name: Fireworks Tracing Integration Tests + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Install uv + uses: astral-sh/setup-uv@v6 + with: + enable-cache: true + + - name: Install the project + run: uv sync --locked --all-extras --dev + + - name: Run Fireworks Tracing Integration Tests + env: + FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} + PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" + run: | + # Run Fireworks tracing integration tests + uv run pytest tests/logging/test_fireworks_tracing_integration.py -v --tb=short + + # Run RemoteRolloutProcessor with Fireworks tracing (if exists) + if [ -f "tests/remote_server/test_remote_fireworks.py" ]; then + echo "Running RemoteRolloutProcessor Fireworks integration test..." + # Note: This requires manual server startup, so may need to be run separately + # uv run pytest tests/remote_server/test_remote_fireworks.py -v --tb=short + fi + + - name: Run Status Propagation Tests + env: + FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} + PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" + run: | + # Run any status propagation tests that use Fireworks tracing + if [ -f "tests/remote_server/test_remote_fireworks_propagate_status.py" ]; then + echo "Running status propagation tests with Fireworks tracing..." + uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py -v --tb=short || true + fi + + fireworks-tracing-smoke-test: + name: Fireworks Tracing Smoke Test + runs-on: ubuntu-latest + needs: fireworks-tracing-tests + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Install uv + uses: astral-sh/setup-uv@v6 + with: + enable-cache: true + + - name: Install the project + run: uv sync --locked --all-extras --dev + + - name: Smoke Test - Basic Log Sending + env: + FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} + run: | + # Run just the basic log sending test as a smoke test + uv run pytest tests/logging/test_fireworks_tracing_integration.py::test_fireworks_tracing_handler_sends_logs -v + + - name: Smoke Test - Status Logging + env: + FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} + run: | + # Run just the status logging test as a smoke test + uv run pytest tests/logging/test_fireworks_tracing_integration.py::test_fireworks_tracing_handler_logs_status_info -v || true diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py index df59f9de..9098b088 100644 --- a/tests/logging/test_elasticsearch_direct_http_handler.py +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -2,6 +2,7 @@ import logging import time import pytest +import warnings from datetime import datetime, timezone from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler @@ -9,6 +10,17 @@ from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup from eval_protocol.types.remote_rollout_processor import ElasticsearchConfig +# DEPRECATION WARNING: These Elasticsearch integration tests are deprecated +# in favor of Fireworks tracing integration tests. See: +# tests/logging/test_fireworks_tracing_integration.py +warnings.warn( + "Elasticsearch integration tests are deprecated. " + "Use Fireworks tracing integration tests instead: " + "tests/logging/test_fireworks_tracing_integration.py", + DeprecationWarning, + stacklevel=2, +) + @pytest.fixture def rollout_id(): @@ -110,6 +122,7 @@ def clear_elasticsearch_before_test( print(f"Warning: Failed to clear Elasticsearch index before test: {e}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_sends_logs( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -159,6 +172,7 @@ def test_elasticsearch_direct_http_handler_sends_logs( print(f"Successfully verified log message in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -212,6 +226,7 @@ def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( print(f"Timestamps in order: {found_timestamps}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_includes_rollout_id( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -269,6 +284,7 @@ def test_elasticsearch_direct_http_handler_includes_rollout_id( print(f"Successfully verified log message with rollout_id '{rollout_id}' in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_search_by_rollout_id( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -337,6 +353,7 @@ def test_elasticsearch_direct_http_handler_search_by_rollout_id( print("Verified that search for different rollout_id returns 0 results") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_logs_status_info( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -402,6 +419,7 @@ def test_elasticsearch_direct_http_handler_logs_status_info( print(f"Successfully verified Status logging with code {test_status.code.value} in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_search_by_status_code( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -460,6 +478,7 @@ def test_elasticsearch_direct_http_handler_search_by_status_code( print(f"Successfully verified search by status code {running_status.value} found {len(hits)} log messages") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_rollout_id_from_extra_overrides_env( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -539,6 +558,7 @@ def test_elasticsearch_direct_http_handler_rollout_id_from_extra_overrides_env( print(f"Successfully verified rollout_id override: extra '{extra_rollout_id}' overrode environment '{rollout_id}'") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_timestamp_format( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): diff --git a/tests/logging/test_fireworks_tracing_integration.py b/tests/logging/test_fireworks_tracing_integration.py new file mode 100644 index 00000000..98d93905 --- /dev/null +++ b/tests/logging/test_fireworks_tracing_integration.py @@ -0,0 +1,624 @@ +import os +import logging +import time +import pytest +from datetime import datetime, timezone +from typing import List, Dict, Any + +from eval_protocol.log_utils.fireworks_tracing_http_handler import FireworksTracingHttpHandler +from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter + + +@pytest.fixture +def rollout_id(): + """Set up EP_ROLLOUT_ID environment variable for tests.""" + import uuid + + # Generate a unique rollout ID for this test session + test_rollout_id = f"test-rollout-{uuid.uuid4().hex[:8]}" + + # Set the environment variable + os.environ["EP_ROLLOUT_ID"] = test_rollout_id + + yield test_rollout_id + + # Clean up after the test + if "EP_ROLLOUT_ID" in os.environ: + del os.environ["EP_ROLLOUT_ID"] + + +@pytest.fixture +def fireworks_base_url(): + """Get Fireworks tracing base URL from environment or use default.""" + return os.environ.get("FW_TRACING_GATEWAY_BASE_URL", "https://tracing.fireworks.ai") + + +@pytest.fixture +def fireworks_handler(fireworks_base_url: str, rollout_id: str): + """Create and configure FireworksTracingHttpHandler.""" + handler = FireworksTracingHttpHandler(gateway_base_url=fireworks_base_url) + + # Set a specific log level + handler.setLevel(logging.INFO) + + return handler + + +@pytest.fixture +def fireworks_adapter(fireworks_base_url: str): + """Create a FireworksTracingAdapter for testing.""" + return FireworksTracingAdapter(base_url=fireworks_base_url) + + +@pytest.fixture +def test_logger(fireworks_handler, rollout_id: str): + """Set up a test logger with the Fireworks tracing handler.""" + logger = logging.getLogger("test_fireworks_tracing_logger") + logger.setLevel(logging.INFO) + + # Clear any existing handlers + logger.handlers.clear() + + # Add our Fireworks tracing handler + logger.addHandler(fireworks_handler) + + # Prevent propagation to avoid duplicate logs + logger.propagate = False + + yield logger + + # Clean up the logger handlers after the test + logger.handlers.clear() + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_sends_logs( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that FireworksTracingHttpHandler successfully sends logs to Fireworks tracing.""" + + # Generate a unique test message to avoid conflicts + test_message = f"Test log message at {time.time()}" + + # Send the log message + test_logger.info(test_message) + + # Give Fireworks tracing a moment to process the log + time.sleep(5) + + # Search for the log using the adapter + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) + + # Assert that we found our log message + assert log_entries is not None, "Search should return results" + assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" + + # Find our specific test message + found_entry = None + for entry in log_entries: + if entry.get("message") == test_message: + found_entry = entry + break + + assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" + + # Verify the content of the found entry + assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" + assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" + assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" + assert f"rollout_id:{rollout_id}" in found_entry.get("tags", []), ( + f"Expected rollout_id tag in entry tags: {found_entry.get('tags', [])}" + ) + + print(f"Successfully verified log message in Fireworks tracing: {test_message}") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_sorts_logs_chronologically( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that logs can be sorted chronologically by timestamp.""" + + # Send multiple log messages with small delays to ensure different timestamps + test_messages = [] + for i in range(3): + message = f"Chronological test message {i} at {time.time()}" + test_messages.append(message) + test_logger.info(message) + time.sleep(0.5) # Small delay to ensure different timestamps + + # Give Fireworks tracing time to process all logs + time.sleep(5) + + # Search for logs with our rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) + + # Filter to only our test messages + found_entries = [] + for entry in log_entries: + for test_message in test_messages: + if entry.get("message") == test_message: + found_entries.append(entry) + break + + assert len(found_entries) >= 3, f"Expected at least 3 messages, found {len(found_entries)}" + + # Extract timestamps and verify they are in chronological order + found_timestamps = [entry["timestamp"] for entry in found_entries] + + # Sort entries by timestamp for verification + found_entries_sorted = sorted(found_entries, key=lambda x: x["timestamp"]) + + # Verify all our test messages are present + found_messages = [entry["message"] for entry in found_entries_sorted] + for test_message in test_messages: + assert test_message in found_messages, f"Expected message '{test_message}' not found in results" + + print(f"Successfully verified chronological sorting of {len(found_entries)} log messages") + print(f"Timestamps in order: {found_timestamps}") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_includes_rollout_id( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that FireworksTracingHttpHandler includes rollout_id in tags.""" + + # Generate a unique test message to avoid conflicts + test_message = f"Rollout ID test message at {time.time()}" + + # Send the log message + test_logger.info(test_message) + + # Give Fireworks tracing a moment to process the log + time.sleep(5) + + # Search for logs with our specific rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) + + # Assert that we found our log message + assert log_entries is not None, "Search should return results" + assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" + + # Find our specific test message + found_entry = None + for entry in log_entries: + if entry.get("message") == test_message: + found_entry = entry + break + + assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" + + # Verify the rollout_id tag is present and correct + tags = found_entry.get("tags", []) + rollout_tag = f"rollout_id:{rollout_id}" + assert rollout_tag in tags, f"Expected rollout_id tag '{rollout_tag}' in tags: {tags}" + + # Verify other expected fields are still present + assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" + assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" + assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" + + print(f"Successfully verified log message with rollout_id '{rollout_id}' in Fireworks tracing: {test_message}") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_search_by_rollout_id( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that logs can be searched by rollout_id tag in Fireworks tracing.""" + + # Generate unique test messages to avoid conflicts + test_messages = [] + for i in range(3): + message = f"Rollout search test message {i} at {time.time()}" + test_messages.append(message) + test_logger.info(message) + time.sleep(0.2) # Small delay to ensure different timestamps + + # Give Fireworks tracing time to process all logs + time.sleep(5) + + # Search for logs with our specific rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) + + # Assert that we found our log messages + assert log_entries is not None, "Search should return results" + assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" + + # Verify all found entries have the correct rollout_id tag + found_messages = [] + rollout_tag = f"rollout_id:{rollout_id}" + for entry in log_entries: + tags = entry.get("tags", []) + assert rollout_tag in tags, f"Expected rollout_id tag '{rollout_tag}' in tags: {tags}" + found_messages.append(entry["message"]) + + # Verify all our test messages are present in the search results + for test_message in test_messages: + assert test_message in found_messages, f"Expected message '{test_message}' not found in search results" + + # Test searching for a different rollout_id (should return no results for our messages) + different_rollout_id = f"different-rollout-{time.time()}" + different_entries = fireworks_adapter.search_logs( + tags=[f"rollout_id:{different_rollout_id}"], limit=10, hours_back=1 + ) + + # Should either be empty or not contain our test messages + if different_entries: + different_messages = [entry.get("message", "") for entry in different_entries] + for test_message in test_messages: + assert test_message not in different_messages, ( + f"Found test message '{test_message}' in search for different rollout_id" + ) + + print(f"Successfully verified search by rollout_id '{rollout_id}' found {len(log_entries)} log entries") + print("Verified that search for different rollout_id doesn't return our test messages") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_logs_status_info( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that FireworksTracingHttpHandler logs Status class instances and can search by status code.""" + from eval_protocol import Status + + # Create a Status instance + test_status = Status.rollout_running() + + # Generate a unique test message + test_message = f"Status logging test message at {time.time()}" + + # Log with Status instance in extra data + test_logger.info(test_message, extra={"status": test_status}) + + # Give Fireworks tracing time to process the log + time.sleep(5) + + # Search for logs with our rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) + + # Assert that we found our log message + assert log_entries is not None, "Search should return results" + assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" + + # Find our specific test message + found_entry = None + for entry in log_entries: + if entry.get("message") == test_message: + found_entry = entry + break + + assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" + + # Verify the status fields are present and correct + assert "status_code" in found_entry, "Expected entry to contain 'status_code' field" + assert found_entry["status_code"] == test_status.code.value, ( + f"Expected status_code {test_status.code.value}, got {found_entry['status_code']}" + ) + assert "status_message" in found_entry, "Expected entry to contain 'status_message' field" + assert found_entry["status_message"] == test_status.message, ( + f"Expected status_message '{test_status.message}', got '{found_entry['status_message']}" + ) + assert "status_details" in found_entry, "Expected entry to contain 'status_details' field" + assert found_entry["status_details"] == test_status.details, ( + f"Expected status_details {test_status.details}, got {found_entry['status_details']}" + ) + + # Verify other expected fields are still present + assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" + rollout_tag = f"rollout_id:{rollout_id}" + assert rollout_tag in found_entry.get("tags", []), ( + f"Expected rollout_id tag '{rollout_tag}' in tags: {found_entry.get('tags', [])}" + ) + + print( + f"Successfully verified Status logging with code {test_status.code.value} in Fireworks tracing: {test_message}" + ) + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_search_by_status_code( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that logs can be searched and filtered by status code in Fireworks tracing.""" + from eval_protocol.models import Status + + # Create different Status instances for testing + statuses = [ + Status.rollout_running(), + Status.eval_finished(), + Status.error("Test error message"), + ] + + # Generate unique test messages + test_messages = [] + for i, status in enumerate(statuses): + message = f"Status search test message {i} at {time.time()}" + test_messages.append((message, status)) + test_logger.info(message, extra={"status": status}) + time.sleep(0.2) # Small delay to ensure different timestamps + + # Give Fireworks tracing time to process all logs + time.sleep(5) + + # Search for all logs with our rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) + + # Assert that we found our log messages + assert log_entries is not None, "Search should return results" + assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" + + # Find entries with RUNNING status code + running_entries = [] + finished_entries = [] + error_entries = [] + + for entry in log_entries: + if "status_code" in entry: + if entry["status_code"] == Status.Code.RUNNING.value: + running_entries.append(entry) + elif entry["status_code"] == Status.Code.FINISHED.value: + finished_entries.append(entry) + elif entry["status_code"] == Status.Code.INTERNAL.value: # Error status + error_entries.append(entry) + + # Verify we found entries for each status type + assert len(running_entries) >= 1, f"Expected at least 1 RUNNING status entry, found {len(running_entries)}" + assert len(finished_entries) >= 1, f"Expected at least 1 FINISHED status entry, found {len(finished_entries)}" + assert len(error_entries) >= 1, f"Expected at least 1 ERROR status entry, found {len(error_entries)}" + + # Verify the content of the found entries + for entry in running_entries: + assert entry["status_code"] == Status.Code.RUNNING.value, ( + f"Expected status_code {Status.Code.RUNNING.value}, got {entry['status_code']}" + ) + rollout_tag = f"rollout_id:{rollout_id}" + assert rollout_tag in entry.get("tags", []), ( + f"Expected rollout_id tag '{rollout_tag}' in tags: {entry.get('tags', [])}" + ) + + print("Successfully verified search by status codes:") + print(f" - RUNNING entries: {len(running_entries)}") + print(f" - FINISHED entries: {len(finished_entries)}") + print(f" - ERROR entries: {len(error_entries)}") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_rollout_id_from_extra_overrides_env( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that rollout_id in extra parameter overrides environment variable.""" + + # Create a different rollout_id to pass in extra + extra_rollout_id = f"extra-rollout-{time.time()}" + + # Generate a unique test message + test_message = f"Rollout ID override test message at {time.time()}" + + # Log with rollout_id in extra data (should override environment variable) + test_logger.info(test_message, extra={"rollout_id": extra_rollout_id}) + + # Give Fireworks tracing time to process the log + time.sleep(5) + + # Search for logs with the extra rollout_id (not the environment one) + extra_log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{extra_rollout_id}"], limit=10, hours_back=1) + + # Assert that we found our log message with the extra rollout_id + assert extra_log_entries is not None, "Search should return results" + assert len(extra_log_entries) > 0, ( + f"Expected to find at least 1 log entry with extra rollout_id, but found {len(extra_log_entries)}" + ) + + # Find our specific test message + found_entry = None + for entry in extra_log_entries: + if entry.get("message") == test_message: + found_entry = entry + break + + assert found_entry is not None, f"Expected to find test message '{test_message}' in extra rollout_id search" + + # Verify the rollout_id tag matches the extra parameter (not environment variable) + extra_rollout_tag = f"rollout_id:{extra_rollout_id}" + env_rollout_tag = f"rollout_id:{rollout_id}" + + tags = found_entry.get("tags", []) + assert extra_rollout_tag in tags, f"Expected extra rollout_id tag '{extra_rollout_tag}' in tags: {tags}" + assert env_rollout_tag not in tags, f"Expected environment rollout_id tag '{env_rollout_tag}' NOT in tags: {tags}" + + # Verify other expected fields are still present + assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" + assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" + assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" + + # Verify that searching for the original environment rollout_id doesn't find this message + env_log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) + + # Should either be empty or not contain our test message + if env_log_entries: + env_messages = [entry.get("message", "") for entry in env_log_entries] + assert test_message not in env_messages, ( + f"Found test message '{test_message}' when searching with environment rollout_id '{rollout_id}'" + ) + + print(f"Successfully verified rollout_id override: extra '{extra_rollout_id}' overrode environment '{rollout_id}'") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_timestamp_format( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test that FireworksTracingHttpHandler formats timestamps correctly with UTC timezone.""" + + # Generate a unique test message + test_message = f"Timestamp format test message at {time.time()}" + + # Record the time before logging to compare with the timestamp + before_log_time = datetime.now(timezone.utc) + + # Send the log message + test_logger.info(test_message) + + # Record the time after logging + after_log_time = datetime.now(timezone.utc) + + # Give Fireworks tracing a moment to process the log + time.sleep(5) + + # Search for the log using the adapter + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) + + # Assert that we found our log message + assert log_entries is not None, "Search should return results" + assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" + + # Find our specific test message + found_entry = None + for entry in log_entries: + if entry.get("message") == test_message: + found_entry = entry + break + + assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" + assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" + + # Get the timestamp from the entry + timestamp_str = found_entry["timestamp"] + + # Verify the timestamp format matches ISO 8601 with UTC timezone (Z suffix) + assert timestamp_str.endswith("Z"), f"Expected timestamp to end with 'Z' (UTC), got: {timestamp_str}" + + # Parse the timestamp to verify it's valid + try: + parsed_timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) + except ValueError as e: + pytest.fail(f"Failed to parse timestamp '{timestamp_str}': {e}") + + # Verify the timestamp is timezone-aware (UTC) + assert parsed_timestamp.tzinfo is not None, "Expected timestamp to be timezone-aware" + utc_offset = parsed_timestamp.tzinfo.utcoffset(None) + assert utc_offset is not None and utc_offset.total_seconds() == 0, "Expected timestamp to be in UTC timezone" + + # Verify the timestamp is within reasonable bounds (between before and after log time) + # Allow for some margin due to processing time + from datetime import timedelta + + time_margin = timedelta(seconds=30) # 30 seconds margin for network latency + assert before_log_time - time_margin <= parsed_timestamp <= after_log_time + time_margin, ( + f"Expected timestamp {parsed_timestamp} to be between {before_log_time} and {after_log_time} " + f"(with {time_margin} margin)" + ) + + # Verify the timestamp format includes microseconds + assert "." in timestamp_str, "Expected timestamp to include microseconds" + assert timestamp_str.count(".") == 1, "Expected timestamp to have exactly one decimal point" + + # Verify the format matches the expected pattern: YYYY-MM-DDTHH:MM:SS.ffffffZ + import re + + iso_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z$" + assert re.match(iso_pattern, timestamp_str), f"Expected timestamp to match ISO 8601 pattern, got: {timestamp_str}" + + print(f"Successfully verified timestamp format: {timestamp_str}") + print(f"Parsed timestamp: {parsed_timestamp} (UTC)") + print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}") + + +@pytest.mark.skipif( + not os.environ.get("FIREWORKS_API_KEY"), + reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", +) +def test_fireworks_tracing_handler_status_polling_flow( + fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str +): + """Test the complete status polling flow: RUNNING -> FINISHED with status detection.""" + from eval_protocol import Status + + # Simulate a rollout flow with status transitions + test_message_base = f"Status polling flow test at {time.time()}" + + # 1. Log RUNNING status + running_message = f"{test_message_base} - RUNNING" + test_logger.info(running_message, extra={"status": Status.rollout_running()}) + time.sleep(1) + + # 2. Log some progress messages + progress_message = f"{test_message_base} - Progress update" + test_logger.info(progress_message) + time.sleep(1) + + # 3. Log FINISHED status + finished_message = f"{test_message_base} - FINISHED" + test_logger.info(finished_message, extra={"status": Status.rollout_finished()}) + + # Give Fireworks tracing time to process all logs + time.sleep(5) + + # Search for all logs with our rollout_id + log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) + + # Assert that we found our log messages + assert log_entries is not None, "Search should return results" + assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" + + # Find entries with status codes (non-RUNNING status should be detectable for polling) + status_entries = [] + non_running_entries = [] + + for entry in log_entries: + if "status_code" in entry: + status_entries.append(entry) + if entry["status_code"] != Status.Code.RUNNING.value: + non_running_entries.append(entry) + + # Verify we found status entries + assert len(status_entries) >= 2, ( + f"Expected at least 2 status entries (RUNNING, FINISHED), found {len(status_entries)}" + ) + + # Verify we found non-RUNNING entries (which would trigger polling to stop) + assert len(non_running_entries) >= 1, f"Expected at least 1 non-RUNNING entry, found {len(non_running_entries)}" + + # Find the FINISHED entry specifically + finished_entry = None + for entry in non_running_entries: + if entry["status_code"] == Status.Code.FINISHED.value: + finished_entry = entry + break + + assert finished_entry is not None, "Expected to find FINISHED status entry" + assert finished_entry["status_message"] == "Rollout finished", ( + f"Expected FINISHED status message, got '{finished_entry['status_message']}'" + ) + + print("Successfully verified status polling flow:") + print(f" - Total status entries: {len(status_entries)}") + print(f" - Non-RUNNING entries: {len(non_running_entries)}") + print(f" - FINISHED entry found with message: '{finished_entry['status_message']}'") + print("This flow simulates what RemoteRolloutProcessor would detect during status polling") From 22602b382beb58623b8bb562d639c2ed2d9715df Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 09:48:24 -0700 Subject: [PATCH 3/7] update naming --- .../log_utils/fireworks_tracing_http_handler.py | 12 ++++++------ eval_protocol/pytest/remote_rollout_processor.py | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/eval_protocol/log_utils/fireworks_tracing_http_handler.py b/eval_protocol/log_utils/fireworks_tracing_http_handler.py index b04b2692..2031d13a 100644 --- a/eval_protocol/log_utils/fireworks_tracing_http_handler.py +++ b/eval_protocol/log_utils/fireworks_tracing_http_handler.py @@ -94,16 +94,16 @@ def _get_status_info(self, record: logging.LogRecord) -> Optional[Dict[str, Any] status_code = status_code.value return { - "status_code": status_code, - "status_message": status.message, - "status_details": getattr(status, "details", []), + "code": status_code, + "message": status.message, + "details": getattr(status, "details", []), } elif isinstance(status, dict): # Dictionary representation of status return { - "status_code": status.get("code"), - "status_message": status.get("message"), - "status_details": status.get("details", []), + "code": status.get("code"), + "message": status.get("message"), + "details": status.get("details", []), } return None diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 4b2653a3..2a99a842 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -151,10 +151,10 @@ def _get_status() -> Dict[str, Any]: # Look for structured status dictionary in status field status_dict = latest_log.get("status") - if status_dict and isinstance(status_dict, dict) and "status_code" in status_dict: - status_code = status_dict.get("status_code") - status_message = status_dict.get("status_message", "") - status_details = status_dict.get("status_details", []) + if status_dict and isinstance(status_dict, dict) and "code" in status_dict: + status_code = status_dict.get("code") + status_message = status_dict.get("message", "") + status_details = status_dict.get("details", []) logger.info( f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" From f668527b6e3260f186dcaf8c50aadad92fbf9720 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 10:58:20 -0700 Subject: [PATCH 4/7] remove old test --- .github/workflows/elasticsearch-tests.yml | 66 -- .github/workflows/fireworks-tracing-tests.yml | 61 +- .../test_fireworks_tracing_integration.py | 624 ------------------ 3 files changed, 4 insertions(+), 747 deletions(-) delete mode 100644 .github/workflows/elasticsearch-tests.yml delete mode 100644 tests/logging/test_fireworks_tracing_integration.py diff --git a/.github/workflows/elasticsearch-tests.yml b/.github/workflows/elasticsearch-tests.yml deleted file mode 100644 index 333bc174..00000000 --- a/.github/workflows/elasticsearch-tests.yml +++ /dev/null @@ -1,66 +0,0 @@ -# DEPRECATED: This workflow is deprecated in favor of fireworks-tracing-tests.yml -# The Elasticsearch integration tests have been replaced with Fireworks tracing tests -# This workflow is kept for backward compatibility but should be removed in the future - -name: Elasticsearch Tests (Deprecated) - -on: - # Disabled automatic triggers - only manual for backward compatibility - # push: - # branches: [main] - # paths-ignore: - # - "docs/**" - # - "*.md" - # pull_request: # Run on all pull requests - # paths-ignore: - # - "docs/**" - # - "*.md" - workflow_dispatch: # Allow manual triggering only - -jobs: - elasticsearch-tests: - name: Elasticsearch Integration Tests (Deprecated) - runs-on: ubuntu-latest - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Set up Python 3.10 - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - - - name: Install the project - run: uv sync --locked --all-extras --dev - - - name: Show Deprecation Notice - run: | - echo "âš ī¸ DEPRECATION NOTICE âš ī¸" - echo "This workflow is deprecated. Use fireworks-tracing-tests.yml instead." - echo "Elasticsearch integration tests have been replaced with Fireworks tracing tests." - echo "" - echo "📋 New test file: tests/logging/test_fireworks_tracing_integration.py" - echo "🔄 New workflow: .github/workflows/fireworks-tracing-tests.yml" - - - name: Run Deprecated Elasticsearch Tests - env: - FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} - PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" - run: | - echo "Running deprecated Elasticsearch tests (these are skipped by default)..." - - # Try to run the tests, but they should be skipped - uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short || true - - echo "" - echo "â„šī¸ Note: These tests are deprecated and skipped by default." - echo "Use the new Fireworks tracing tests instead:" - echo "uv run pytest tests/logging/test_fireworks_tracing_integration.py -v" diff --git a/.github/workflows/fireworks-tracing-tests.yml b/.github/workflows/fireworks-tracing-tests.yml index 31bf15d7..abe52355 100644 --- a/.github/workflows/fireworks-tracing-tests.yml +++ b/.github/workflows/fireworks-tracing-tests.yml @@ -36,64 +36,11 @@ jobs: - name: Install the project run: uv sync --locked --all-extras --dev - - name: Run Fireworks Tracing Integration Tests + - name: Run Fireworks Tracing Tests env: FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" run: | - # Run Fireworks tracing integration tests - uv run pytest tests/logging/test_fireworks_tracing_integration.py -v --tb=short - - # Run RemoteRolloutProcessor with Fireworks tracing (if exists) - if [ -f "tests/remote_server/test_remote_fireworks.py" ]; then - echo "Running RemoteRolloutProcessor Fireworks integration test..." - # Note: This requires manual server startup, so may need to be run separately - # uv run pytest tests/remote_server/test_remote_fireworks.py -v --tb=short - fi - - - name: Run Status Propagation Tests - env: - FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} - PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" - run: | - # Run any status propagation tests that use Fireworks tracing - if [ -f "tests/remote_server/test_remote_fireworks_propagate_status.py" ]; then - echo "Running status propagation tests with Fireworks tracing..." - uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py -v --tb=short || true - fi - - fireworks-tracing-smoke-test: - name: Fireworks Tracing Smoke Test - runs-on: ubuntu-latest - needs: fireworks-tracing-tests - - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - - name: Set up Python 3.10 - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - - - name: Install the project - run: uv sync --locked --all-extras --dev - - - name: Smoke Test - Basic Log Sending - env: - FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} - run: | - # Run just the basic log sending test as a smoke test - uv run pytest tests/logging/test_fireworks_tracing_integration.py::test_fireworks_tracing_handler_sends_logs -v - - - name: Smoke Test - Status Logging - env: - FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} - run: | - # Run just the status logging test as a smoke test - uv run pytest tests/logging/test_fireworks_tracing_integration.py::test_fireworks_tracing_handler_logs_status_info -v || true + # Run RemoteRolloutProcessor Propagate Status Smoke Test (now uses Fireworks tracing) + uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \ + -v --tb=short diff --git a/tests/logging/test_fireworks_tracing_integration.py b/tests/logging/test_fireworks_tracing_integration.py deleted file mode 100644 index 98d93905..00000000 --- a/tests/logging/test_fireworks_tracing_integration.py +++ /dev/null @@ -1,624 +0,0 @@ -import os -import logging -import time -import pytest -from datetime import datetime, timezone -from typing import List, Dict, Any - -from eval_protocol.log_utils.fireworks_tracing_http_handler import FireworksTracingHttpHandler -from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter - - -@pytest.fixture -def rollout_id(): - """Set up EP_ROLLOUT_ID environment variable for tests.""" - import uuid - - # Generate a unique rollout ID for this test session - test_rollout_id = f"test-rollout-{uuid.uuid4().hex[:8]}" - - # Set the environment variable - os.environ["EP_ROLLOUT_ID"] = test_rollout_id - - yield test_rollout_id - - # Clean up after the test - if "EP_ROLLOUT_ID" in os.environ: - del os.environ["EP_ROLLOUT_ID"] - - -@pytest.fixture -def fireworks_base_url(): - """Get Fireworks tracing base URL from environment or use default.""" - return os.environ.get("FW_TRACING_GATEWAY_BASE_URL", "https://tracing.fireworks.ai") - - -@pytest.fixture -def fireworks_handler(fireworks_base_url: str, rollout_id: str): - """Create and configure FireworksTracingHttpHandler.""" - handler = FireworksTracingHttpHandler(gateway_base_url=fireworks_base_url) - - # Set a specific log level - handler.setLevel(logging.INFO) - - return handler - - -@pytest.fixture -def fireworks_adapter(fireworks_base_url: str): - """Create a FireworksTracingAdapter for testing.""" - return FireworksTracingAdapter(base_url=fireworks_base_url) - - -@pytest.fixture -def test_logger(fireworks_handler, rollout_id: str): - """Set up a test logger with the Fireworks tracing handler.""" - logger = logging.getLogger("test_fireworks_tracing_logger") - logger.setLevel(logging.INFO) - - # Clear any existing handlers - logger.handlers.clear() - - # Add our Fireworks tracing handler - logger.addHandler(fireworks_handler) - - # Prevent propagation to avoid duplicate logs - logger.propagate = False - - yield logger - - # Clean up the logger handlers after the test - logger.handlers.clear() - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_sends_logs( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that FireworksTracingHttpHandler successfully sends logs to Fireworks tracing.""" - - # Generate a unique test message to avoid conflicts - test_message = f"Test log message at {time.time()}" - - # Send the log message - test_logger.info(test_message) - - # Give Fireworks tracing a moment to process the log - time.sleep(5) - - # Search for the log using the adapter - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) - - # Assert that we found our log message - assert log_entries is not None, "Search should return results" - assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" - - # Find our specific test message - found_entry = None - for entry in log_entries: - if entry.get("message") == test_message: - found_entry = entry - break - - assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" - - # Verify the content of the found entry - assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" - assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" - assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" - assert f"rollout_id:{rollout_id}" in found_entry.get("tags", []), ( - f"Expected rollout_id tag in entry tags: {found_entry.get('tags', [])}" - ) - - print(f"Successfully verified log message in Fireworks tracing: {test_message}") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_sorts_logs_chronologically( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that logs can be sorted chronologically by timestamp.""" - - # Send multiple log messages with small delays to ensure different timestamps - test_messages = [] - for i in range(3): - message = f"Chronological test message {i} at {time.time()}" - test_messages.append(message) - test_logger.info(message) - time.sleep(0.5) # Small delay to ensure different timestamps - - # Give Fireworks tracing time to process all logs - time.sleep(5) - - # Search for logs with our rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) - - # Filter to only our test messages - found_entries = [] - for entry in log_entries: - for test_message in test_messages: - if entry.get("message") == test_message: - found_entries.append(entry) - break - - assert len(found_entries) >= 3, f"Expected at least 3 messages, found {len(found_entries)}" - - # Extract timestamps and verify they are in chronological order - found_timestamps = [entry["timestamp"] for entry in found_entries] - - # Sort entries by timestamp for verification - found_entries_sorted = sorted(found_entries, key=lambda x: x["timestamp"]) - - # Verify all our test messages are present - found_messages = [entry["message"] for entry in found_entries_sorted] - for test_message in test_messages: - assert test_message in found_messages, f"Expected message '{test_message}' not found in results" - - print(f"Successfully verified chronological sorting of {len(found_entries)} log messages") - print(f"Timestamps in order: {found_timestamps}") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_includes_rollout_id( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that FireworksTracingHttpHandler includes rollout_id in tags.""" - - # Generate a unique test message to avoid conflicts - test_message = f"Rollout ID test message at {time.time()}" - - # Send the log message - test_logger.info(test_message) - - # Give Fireworks tracing a moment to process the log - time.sleep(5) - - # Search for logs with our specific rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) - - # Assert that we found our log message - assert log_entries is not None, "Search should return results" - assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" - - # Find our specific test message - found_entry = None - for entry in log_entries: - if entry.get("message") == test_message: - found_entry = entry - break - - assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" - - # Verify the rollout_id tag is present and correct - tags = found_entry.get("tags", []) - rollout_tag = f"rollout_id:{rollout_id}" - assert rollout_tag in tags, f"Expected rollout_id tag '{rollout_tag}' in tags: {tags}" - - # Verify other expected fields are still present - assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" - assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" - assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" - - print(f"Successfully verified log message with rollout_id '{rollout_id}' in Fireworks tracing: {test_message}") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_search_by_rollout_id( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that logs can be searched by rollout_id tag in Fireworks tracing.""" - - # Generate unique test messages to avoid conflicts - test_messages = [] - for i in range(3): - message = f"Rollout search test message {i} at {time.time()}" - test_messages.append(message) - test_logger.info(message) - time.sleep(0.2) # Small delay to ensure different timestamps - - # Give Fireworks tracing time to process all logs - time.sleep(5) - - # Search for logs with our specific rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) - - # Assert that we found our log messages - assert log_entries is not None, "Search should return results" - assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" - - # Verify all found entries have the correct rollout_id tag - found_messages = [] - rollout_tag = f"rollout_id:{rollout_id}" - for entry in log_entries: - tags = entry.get("tags", []) - assert rollout_tag in tags, f"Expected rollout_id tag '{rollout_tag}' in tags: {tags}" - found_messages.append(entry["message"]) - - # Verify all our test messages are present in the search results - for test_message in test_messages: - assert test_message in found_messages, f"Expected message '{test_message}' not found in search results" - - # Test searching for a different rollout_id (should return no results for our messages) - different_rollout_id = f"different-rollout-{time.time()}" - different_entries = fireworks_adapter.search_logs( - tags=[f"rollout_id:{different_rollout_id}"], limit=10, hours_back=1 - ) - - # Should either be empty or not contain our test messages - if different_entries: - different_messages = [entry.get("message", "") for entry in different_entries] - for test_message in test_messages: - assert test_message not in different_messages, ( - f"Found test message '{test_message}' in search for different rollout_id" - ) - - print(f"Successfully verified search by rollout_id '{rollout_id}' found {len(log_entries)} log entries") - print("Verified that search for different rollout_id doesn't return our test messages") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_logs_status_info( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that FireworksTracingHttpHandler logs Status class instances and can search by status code.""" - from eval_protocol import Status - - # Create a Status instance - test_status = Status.rollout_running() - - # Generate a unique test message - test_message = f"Status logging test message at {time.time()}" - - # Log with Status instance in extra data - test_logger.info(test_message, extra={"status": test_status}) - - # Give Fireworks tracing time to process the log - time.sleep(5) - - # Search for logs with our rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) - - # Assert that we found our log message - assert log_entries is not None, "Search should return results" - assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" - - # Find our specific test message - found_entry = None - for entry in log_entries: - if entry.get("message") == test_message: - found_entry = entry - break - - assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" - - # Verify the status fields are present and correct - assert "status_code" in found_entry, "Expected entry to contain 'status_code' field" - assert found_entry["status_code"] == test_status.code.value, ( - f"Expected status_code {test_status.code.value}, got {found_entry['status_code']}" - ) - assert "status_message" in found_entry, "Expected entry to contain 'status_message' field" - assert found_entry["status_message"] == test_status.message, ( - f"Expected status_message '{test_status.message}', got '{found_entry['status_message']}" - ) - assert "status_details" in found_entry, "Expected entry to contain 'status_details' field" - assert found_entry["status_details"] == test_status.details, ( - f"Expected status_details {test_status.details}, got {found_entry['status_details']}" - ) - - # Verify other expected fields are still present - assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" - rollout_tag = f"rollout_id:{rollout_id}" - assert rollout_tag in found_entry.get("tags", []), ( - f"Expected rollout_id tag '{rollout_tag}' in tags: {found_entry.get('tags', [])}" - ) - - print( - f"Successfully verified Status logging with code {test_status.code.value} in Fireworks tracing: {test_message}" - ) - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_search_by_status_code( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that logs can be searched and filtered by status code in Fireworks tracing.""" - from eval_protocol.models import Status - - # Create different Status instances for testing - statuses = [ - Status.rollout_running(), - Status.eval_finished(), - Status.error("Test error message"), - ] - - # Generate unique test messages - test_messages = [] - for i, status in enumerate(statuses): - message = f"Status search test message {i} at {time.time()}" - test_messages.append((message, status)) - test_logger.info(message, extra={"status": status}) - time.sleep(0.2) # Small delay to ensure different timestamps - - # Give Fireworks tracing time to process all logs - time.sleep(5) - - # Search for all logs with our rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) - - # Assert that we found our log messages - assert log_entries is not None, "Search should return results" - assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" - - # Find entries with RUNNING status code - running_entries = [] - finished_entries = [] - error_entries = [] - - for entry in log_entries: - if "status_code" in entry: - if entry["status_code"] == Status.Code.RUNNING.value: - running_entries.append(entry) - elif entry["status_code"] == Status.Code.FINISHED.value: - finished_entries.append(entry) - elif entry["status_code"] == Status.Code.INTERNAL.value: # Error status - error_entries.append(entry) - - # Verify we found entries for each status type - assert len(running_entries) >= 1, f"Expected at least 1 RUNNING status entry, found {len(running_entries)}" - assert len(finished_entries) >= 1, f"Expected at least 1 FINISHED status entry, found {len(finished_entries)}" - assert len(error_entries) >= 1, f"Expected at least 1 ERROR status entry, found {len(error_entries)}" - - # Verify the content of the found entries - for entry in running_entries: - assert entry["status_code"] == Status.Code.RUNNING.value, ( - f"Expected status_code {Status.Code.RUNNING.value}, got {entry['status_code']}" - ) - rollout_tag = f"rollout_id:{rollout_id}" - assert rollout_tag in entry.get("tags", []), ( - f"Expected rollout_id tag '{rollout_tag}' in tags: {entry.get('tags', [])}" - ) - - print("Successfully verified search by status codes:") - print(f" - RUNNING entries: {len(running_entries)}") - print(f" - FINISHED entries: {len(finished_entries)}") - print(f" - ERROR entries: {len(error_entries)}") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_rollout_id_from_extra_overrides_env( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that rollout_id in extra parameter overrides environment variable.""" - - # Create a different rollout_id to pass in extra - extra_rollout_id = f"extra-rollout-{time.time()}" - - # Generate a unique test message - test_message = f"Rollout ID override test message at {time.time()}" - - # Log with rollout_id in extra data (should override environment variable) - test_logger.info(test_message, extra={"rollout_id": extra_rollout_id}) - - # Give Fireworks tracing time to process the log - time.sleep(5) - - # Search for logs with the extra rollout_id (not the environment one) - extra_log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{extra_rollout_id}"], limit=10, hours_back=1) - - # Assert that we found our log message with the extra rollout_id - assert extra_log_entries is not None, "Search should return results" - assert len(extra_log_entries) > 0, ( - f"Expected to find at least 1 log entry with extra rollout_id, but found {len(extra_log_entries)}" - ) - - # Find our specific test message - found_entry = None - for entry in extra_log_entries: - if entry.get("message") == test_message: - found_entry = entry - break - - assert found_entry is not None, f"Expected to find test message '{test_message}' in extra rollout_id search" - - # Verify the rollout_id tag matches the extra parameter (not environment variable) - extra_rollout_tag = f"rollout_id:{extra_rollout_id}" - env_rollout_tag = f"rollout_id:{rollout_id}" - - tags = found_entry.get("tags", []) - assert extra_rollout_tag in tags, f"Expected extra rollout_id tag '{extra_rollout_tag}' in tags: {tags}" - assert env_rollout_tag not in tags, f"Expected environment rollout_id tag '{env_rollout_tag}' NOT in tags: {tags}" - - # Verify other expected fields are still present - assert found_entry["message"] == test_message, f"Expected message '{test_message}', got '{found_entry['message']}'" - assert found_entry["severity"] == "INFO", f"Expected severity 'INFO', got '{found_entry['severity']}'" - assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" - - # Verify that searching for the original environment rollout_id doesn't find this message - env_log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) - - # Should either be empty or not contain our test message - if env_log_entries: - env_messages = [entry.get("message", "") for entry in env_log_entries] - assert test_message not in env_messages, ( - f"Found test message '{test_message}' when searching with environment rollout_id '{rollout_id}'" - ) - - print(f"Successfully verified rollout_id override: extra '{extra_rollout_id}' overrode environment '{rollout_id}'") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_timestamp_format( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test that FireworksTracingHttpHandler formats timestamps correctly with UTC timezone.""" - - # Generate a unique test message - test_message = f"Timestamp format test message at {time.time()}" - - # Record the time before logging to compare with the timestamp - before_log_time = datetime.now(timezone.utc) - - # Send the log message - test_logger.info(test_message) - - # Record the time after logging - after_log_time = datetime.now(timezone.utc) - - # Give Fireworks tracing a moment to process the log - time.sleep(5) - - # Search for the log using the adapter - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=10, hours_back=1) - - # Assert that we found our log message - assert log_entries is not None, "Search should return results" - assert len(log_entries) > 0, f"Expected to find at least 1 log entry, but found {len(log_entries)}" - - # Find our specific test message - found_entry = None - for entry in log_entries: - if entry.get("message") == test_message: - found_entry = entry - break - - assert found_entry is not None, f"Expected to find test message '{test_message}' in log entries" - assert "timestamp" in found_entry, "Expected entry to contain 'timestamp' field" - - # Get the timestamp from the entry - timestamp_str = found_entry["timestamp"] - - # Verify the timestamp format matches ISO 8601 with UTC timezone (Z suffix) - assert timestamp_str.endswith("Z"), f"Expected timestamp to end with 'Z' (UTC), got: {timestamp_str}" - - # Parse the timestamp to verify it's valid - try: - parsed_timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) - except ValueError as e: - pytest.fail(f"Failed to parse timestamp '{timestamp_str}': {e}") - - # Verify the timestamp is timezone-aware (UTC) - assert parsed_timestamp.tzinfo is not None, "Expected timestamp to be timezone-aware" - utc_offset = parsed_timestamp.tzinfo.utcoffset(None) - assert utc_offset is not None and utc_offset.total_seconds() == 0, "Expected timestamp to be in UTC timezone" - - # Verify the timestamp is within reasonable bounds (between before and after log time) - # Allow for some margin due to processing time - from datetime import timedelta - - time_margin = timedelta(seconds=30) # 30 seconds margin for network latency - assert before_log_time - time_margin <= parsed_timestamp <= after_log_time + time_margin, ( - f"Expected timestamp {parsed_timestamp} to be between {before_log_time} and {after_log_time} " - f"(with {time_margin} margin)" - ) - - # Verify the timestamp format includes microseconds - assert "." in timestamp_str, "Expected timestamp to include microseconds" - assert timestamp_str.count(".") == 1, "Expected timestamp to have exactly one decimal point" - - # Verify the format matches the expected pattern: YYYY-MM-DDTHH:MM:SS.ffffffZ - import re - - iso_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}Z$" - assert re.match(iso_pattern, timestamp_str), f"Expected timestamp to match ISO 8601 pattern, got: {timestamp_str}" - - print(f"Successfully verified timestamp format: {timestamp_str}") - print(f"Parsed timestamp: {parsed_timestamp} (UTC)") - print(f"Timestamp is within expected time range: {before_log_time} <= {parsed_timestamp} <= {after_log_time}") - - -@pytest.mark.skipif( - not os.environ.get("FIREWORKS_API_KEY"), - reason="FIREWORKS_API_KEY required for Fireworks tracing integration tests", -) -def test_fireworks_tracing_handler_status_polling_flow( - fireworks_adapter: FireworksTracingAdapter, test_logger: logging.Logger, rollout_id: str -): - """Test the complete status polling flow: RUNNING -> FINISHED with status detection.""" - from eval_protocol import Status - - # Simulate a rollout flow with status transitions - test_message_base = f"Status polling flow test at {time.time()}" - - # 1. Log RUNNING status - running_message = f"{test_message_base} - RUNNING" - test_logger.info(running_message, extra={"status": Status.rollout_running()}) - time.sleep(1) - - # 2. Log some progress messages - progress_message = f"{test_message_base} - Progress update" - test_logger.info(progress_message) - time.sleep(1) - - # 3. Log FINISHED status - finished_message = f"{test_message_base} - FINISHED" - test_logger.info(finished_message, extra={"status": Status.rollout_finished()}) - - # Give Fireworks tracing time to process all logs - time.sleep(5) - - # Search for all logs with our rollout_id - log_entries = fireworks_adapter.search_logs(tags=[f"rollout_id:{rollout_id}"], limit=20, hours_back=1) - - # Assert that we found our log messages - assert log_entries is not None, "Search should return results" - assert len(log_entries) >= 3, f"Expected to find at least 3 log entries, but found {len(log_entries)}" - - # Find entries with status codes (non-RUNNING status should be detectable for polling) - status_entries = [] - non_running_entries = [] - - for entry in log_entries: - if "status_code" in entry: - status_entries.append(entry) - if entry["status_code"] != Status.Code.RUNNING.value: - non_running_entries.append(entry) - - # Verify we found status entries - assert len(status_entries) >= 2, ( - f"Expected at least 2 status entries (RUNNING, FINISHED), found {len(status_entries)}" - ) - - # Verify we found non-RUNNING entries (which would trigger polling to stop) - assert len(non_running_entries) >= 1, f"Expected at least 1 non-RUNNING entry, found {len(non_running_entries)}" - - # Find the FINISHED entry specifically - finished_entry = None - for entry in non_running_entries: - if entry["status_code"] == Status.Code.FINISHED.value: - finished_entry = entry - break - - assert finished_entry is not None, "Expected to find FINISHED status entry" - assert finished_entry["status_message"] == "Rollout finished", ( - f"Expected FINISHED status message, got '{finished_entry['status_message']}'" - ) - - print("Successfully verified status polling flow:") - print(f" - Total status entries: {len(status_entries)}") - print(f" - Non-RUNNING entries: {len(non_running_entries)}") - print(f" - FINISHED entry found with message: '{finished_entry['status_message']}'") - print("This flow simulates what RemoteRolloutProcessor would detect during status polling") From 766931850b4fe54bd814156063b420b3316394fe Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 11:17:51 -0700 Subject: [PATCH 5/7] fixing tests --- .github/workflows/fireworks-tracing-tests.yml | 6 +- tests/remote_server/test_remote_fireworks.py | 69 +++++++++++++++---- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/.github/workflows/fireworks-tracing-tests.yml b/.github/workflows/fireworks-tracing-tests.yml index abe52355..867fef1d 100644 --- a/.github/workflows/fireworks-tracing-tests.yml +++ b/.github/workflows/fireworks-tracing-tests.yml @@ -41,6 +41,10 @@ jobs: FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" run: | - # Run RemoteRolloutProcessor Propagate Status Smoke Test (now uses Fireworks tracing) + # Run RemoteRolloutProcessor End-to-End Test (auto server startup) + uv run pytest tests/remote_server/test_remote_fireworks.py::test_remote_rollout_and_fetch_fireworks \ + -v --tb=short + + # Run RemoteRolloutProcessor Propagate Status Test (auto server startup) uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \ -v --tb=short diff --git a/tests/remote_server/test_remote_fireworks.py b/tests/remote_server/test_remote_fireworks.py index ea133ccf..82d184cb 100644 --- a/tests/remote_server/test_remote_fireworks.py +++ b/tests/remote_server/test_remote_fireworks.py @@ -1,20 +1,13 @@ -# MANUAL SERVER STARTUP REQUIRED: -# -# For Python server testing, start: -# python -m tests.remote_server.remote_server (runs on http://127.0.0.1:3000) -# -# For TypeScript server testing, start: -# cd tests/remote_server/typescript-server -# npm install -# npm start -# -# The TypeScript server should be running on http://127.0.0.1:3000 -# You only need to start one of the servers! +# AUTO SERVER STARTUP: Server is automatically started and stopped by the test import os +import subprocess +import socket +import time from typing import List import pytest +import requests from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader from eval_protocol.models import EvaluationRow, Message @@ -27,6 +20,54 @@ ROLLOUT_IDS = set() +def find_available_port() -> int: + """Find an available port on localhost""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + port = s.getsockname()[1] + return port + + +SERVER_PORT = find_available_port() + + +def wait_for_server_to_startup(timeout: int = 120): + start_time = time.time() + while True: + try: + requests.get(f"http://127.0.0.1:{SERVER_PORT}") + break + except requests.exceptions.RequestException: + time.sleep(1) + if time.time() - start_time > timeout: + raise TimeoutError(f"Server did not start within {timeout} seconds") + + +@pytest.fixture(autouse=True) +def setup_remote_server(): + """Start the remote server""" + # kill all Python processes matching "python -m tests.remote_server.remote_server" + subprocess.run(["pkill", "-f", "python -m tests.remote_server.remote_server"], capture_output=True) + + host = "127.0.0.1" + process = subprocess.Popen( + [ + "python", + "-m", + "tests.remote_server.remote_server", + "--host", + host, + "--port", + str(SERVER_PORT), + ] + ) + # wait for the server to startup by polling + wait_for_server_to_startup() + yield + process.terminate() + process.wait() + + @pytest.fixture(autouse=True) def check_rollout_coverage(): """Ensure we processed all expected rollout_ids""" @@ -64,7 +105,7 @@ def rows() -> List[EvaluationRow]: generators=[rows], ), rollout_processor=RemoteRolloutProcessor( - remote_base_url="http://127.0.0.1:3000", + remote_base_url=f"http://127.0.0.1:{SERVER_PORT}", timeout_seconds=180, output_data_loader=fireworks_output_data_loader, ), @@ -72,7 +113,7 @@ def rows() -> List[EvaluationRow]: async def test_remote_rollout_and_fetch_fireworks(row: EvaluationRow) -> EvaluationRow: """ End-to-end test: - - REQUIRES MANUAL SERVER STARTUP: python -m tests.remote_server.remote_server + - AUTO SERVER STARTUP: Server is automatically started and stopped by the test - trigger remote rollout via RemoteRolloutProcessor (calls init/status) - fetch traces from Langfuse via Fireworks tracing proxy filtered by metadata via output_data_loader; FAIL if none found """ From f1b57bf6565fcff826c8e132d6e1b2ead4a65d7d Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 11:19:57 -0700 Subject: [PATCH 6/7] don't skip --- tests/remote_server/test_remote_fireworks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/remote_server/test_remote_fireworks.py b/tests/remote_server/test_remote_fireworks.py index 82d184cb..303d40f5 100644 --- a/tests/remote_server/test_remote_fireworks.py +++ b/tests/remote_server/test_remote_fireworks.py @@ -98,7 +98,6 @@ def rows() -> List[EvaluationRow]: return [row, row, row] -@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") @pytest.mark.parametrize("completion_params", [{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}]) @evaluation_test( data_loaders=DynamicDataLoader( From 8581a2f5240c5315ace09fa6a6124952a40b0b69 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 21 Oct 2025 11:31:10 -0700 Subject: [PATCH 7/7] fix remote rollout processor --- .../pytest/remote_rollout_processor.py | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 2a99a842..68d47dcd 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -142,32 +142,38 @@ def _get_status() -> Dict[str, Any]: completed_logs = self._tracing_adapter.search_logs( tags=[f"rollout_id:{row.execution_metadata.rollout_id}"] ) - if completed_logs: - latest_log = completed_logs[0] + # Filter for logs that actually have status information + status_logs = [] + for log in completed_logs: + status_dict = log.get("status") + if status_dict and isinstance(status_dict, dict) and "code" in status_dict: + status_logs.append(log) + + if status_logs: + # Use the first log with status information + status_log = status_logs[0] + status_dict = status_log.get("status") logger.info( - f"Found completion log for rollout {row.execution_metadata.rollout_id}: {latest_log.get('message', '')}" + f"Found status log for rollout {row.execution_metadata.rollout_id}: {status_log.get('message', '')}" ) - # Look for structured status dictionary in status field - status_dict = latest_log.get("status") - if status_dict and isinstance(status_dict, dict) and "code" in status_dict: - status_code = status_dict.get("code") - status_message = status_dict.get("message", "") - status_details = status_dict.get("details", []) + status_code = status_dict.get("code") + status_message = status_dict.get("message", "") + status_details = status_dict.get("details", []) - logger.info( - f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" - ) + logger.info( + f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" + ) - row.rollout_status = Status( - code=Status.Code(status_code), - message=status_message, - details=status_details, - ) + row.rollout_status = Status( + code=Status.Code(status_code), + message=status_message, + details=status_details, + ) - logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) - break + logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) + break await asyncio.sleep(poll_interval) else: