diff --git a/.github/workflows/elasticsearch-tests.yml b/.github/workflows/fireworks-tracing-tests.yml similarity index 69% rename from .github/workflows/elasticsearch-tests.yml rename to .github/workflows/fireworks-tracing-tests.yml index 9eba33a8..867fef1d 100644 --- a/.github/workflows/elasticsearch-tests.yml +++ b/.github/workflows/fireworks-tracing-tests.yml @@ -1,4 +1,4 @@ -name: Elasticsearch Tests +name: Fireworks Tracing Tests on: push: @@ -13,8 +13,8 @@ on: workflow_dispatch: # Allow manual triggering jobs: - elasticsearch-tests: - name: Elasticsearch Integration Tests + fireworks-tracing-tests: + name: Fireworks Tracing Integration Tests runs-on: ubuntu-latest steps: @@ -36,14 +36,15 @@ jobs: - name: Install the project run: uv sync --locked --all-extras --dev - - name: Run Elasticsearch Tests + - name: Run Fireworks Tracing Tests env: FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }} PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning" run: | - # Run Elasticsearch direct HTTP handler tests - uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short + # Run RemoteRolloutProcessor End-to-End Test (auto server startup) + uv run pytest tests/remote_server/test_remote_fireworks.py::test_remote_rollout_and_fetch_fireworks \ + -v --tb=short - # Run RemoteRolloutProcessor Propagate Status Smoke Test (also uses Elasticsearch) + # Run RemoteRolloutProcessor Propagate Status Test (auto server startup) uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \ -v --tb=short diff --git a/eval_protocol/__init__.py b/eval_protocol/__init__.py index 768c7e9b..e0ca05cb 100644 --- a/eval_protocol/__init__.py +++ b/eval_protocol/__init__.py @@ -36,7 +36,6 @@ filter_longest_conversation, ) from .pytest import evaluation_test, SingleTurnRolloutProcessor, RemoteRolloutProcessor, GithubActionRolloutProcessor -from .pytest.remote_rollout_processor import create_elasticsearch_config_from_env from .pytest.parameterize import DefaultParameterIdGenerator from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler from .log_utils.rollout_id_filter import RolloutIdFilter @@ -90,7 +89,6 @@ warnings.filterwarnings("default", category=DeprecationWarning, module="eval_protocol") __all__ = [ - "create_elasticsearch_config_from_env", "ElasticsearchConfig", "ElasticsearchDirectHttpHandler", "RolloutIdFilter", diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 7a2bddcb..233371cc 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -309,6 +309,7 @@ def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) - "message": e.get("message"), "severity": e.get("severity", "INFO"), "tags": e.get("tags", []), + "status": e.get("status"), } ) return results diff --git a/eval_protocol/cli_commands/logs.py b/eval_protocol/cli_commands/logs.py index 43261d94..89929a82 100644 --- a/eval_protocol/cli_commands/logs.py +++ b/eval_protocol/cli_commands/logs.py @@ -39,49 +39,6 @@ def logs_command(args): or os.environ.get("GATEWAY_URL") or "https://tracing.fireworks.ai" ) - try: - if not use_fireworks: - if getattr(args, "use_env_elasticsearch_config", False): - # Use environment variables for configuration - print("⚙️ Using environment variables for Elasticsearch config") - from eval_protocol.pytest.remote_rollout_processor import ( - create_elasticsearch_config_from_env, - ) - - elasticsearch_config = create_elasticsearch_config_from_env() - # Ensure index exists with correct mapping, mirroring Docker setup path - try: - from eval_protocol.log_utils.elasticsearch_index_manager import ( - ElasticsearchIndexManager, - ) - - index_manager = ElasticsearchIndexManager( - elasticsearch_config.url, - elasticsearch_config.index_name, - elasticsearch_config.api_key, - ) - created = index_manager.create_logging_index_mapping() - if created: - print( - f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)" - ) - else: - print( - f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly." - ) - except Exception as e: - print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}") - elif not getattr(args, "disable_elasticsearch_setup", False): - # Default behavior: start or connect to local Elasticsearch via Docker helper - from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup - - print("🧰 Auto-configuring local Elasticsearch (Docker)") - elasticsearch_config = ElasticsearchSetup().setup_elasticsearch() - else: - print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration") - except Exception as e: - print(f"❌ Failed to configure Elasticsearch: {e}") - return 1 try: serve_logs( diff --git a/eval_protocol/log_utils/fireworks_tracing_http_handler.py b/eval_protocol/log_utils/fireworks_tracing_http_handler.py index b553a4bd..2031d13a 100644 --- a/eval_protocol/log_utils/fireworks_tracing_http_handler.py +++ b/eval_protocol/log_utils/fireworks_tracing_http_handler.py @@ -79,6 +79,34 @@ def _get_rollout_id(self, record: logging.LogRecord) -> Optional[str]: return str(cast(Any, getattr(record, "rollout_id"))) return os.getenv(self.rollout_id_env) + def _get_status_info(self, record: logging.LogRecord) -> Optional[Dict[str, Any]]: + """Extract status information from the log record's extra data.""" + # Check if 'status' is in the extra data (passed via extra parameter) + if hasattr(record, "status") and record.status is not None: # type: ignore + status = record.status # type: ignore + + # Handle Status class instances (Pydantic BaseModel) + if hasattr(status, "code") and hasattr(status, "message"): + # Status object - extract code and message + status_code = status.code + # Handle both enum values and direct integer values + if hasattr(status_code, "value"): + status_code = status_code.value + + return { + "code": status_code, + "message": status.message, + "details": getattr(status, "details", []), + } + elif isinstance(status, dict): + # Dictionary representation of status + return { + "code": status.get("code"), + "message": status.get("message"), + "details": status.get("details", []), + } + return None + def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str, Any]: timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") message = record.getMessage() @@ -96,28 +124,12 @@ def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str except Exception: pass program = cast(Optional[str], getattr(record, "program", None)) or "eval_protocol" - status_val = cast(Any, getattr(record, "status", None)) - status = status_val if isinstance(status_val, str) else None - # Capture optional structured status fields if present - metadata: Dict[str, Any] = {} - status_code = cast(Any, getattr(record, "status_code", None)) - if isinstance(status_code, int): - metadata["status_code"] = status_code - status_message = cast(Any, getattr(record, "status_message", None)) - if isinstance(status_message, str): - metadata["status_message"] = status_message - status_details = getattr(record, "status_details", None) - if status_details is not None: - metadata["status_details"] = status_details - extra_metadata = cast(Any, getattr(record, "metadata", None)) - if isinstance(extra_metadata, dict): - metadata.update(extra_metadata) + return { "program": program, - "status": status, + "status": self._get_status_info(record), "message": message, "tags": tags, - "metadata": metadata or None, "extras": { "logger_name": record.name, "level": record.levelname, diff --git a/eval_protocol/pytest/remote_rollout_processor.py b/eval_protocol/pytest/remote_rollout_processor.py index 0db806c7..68d47dcd 100644 --- a/eval_protocol/pytest/remote_rollout_processor.py +++ b/eval_protocol/pytest/remote_rollout_processor.py @@ -4,16 +4,15 @@ import requests -from eval_protocol.log_utils.elasticsearch_client import ElasticsearchClient from eval_protocol.models import EvaluationRow, Status from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader from eval_protocol.types.remote_rollout_processor import ( DataLoaderConfig, - ElasticsearchConfig, ) +from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter + from .rollout_processor import RolloutProcessor from .types import RolloutProcessorConfig -from .elasticsearch_setup import ElasticsearchSetup from .tracing_utils import default_fireworks_output_data_loader, build_init_request, update_row_with_remote_trace import logging @@ -22,25 +21,6 @@ logger = logging.getLogger(__name__) -def create_elasticsearch_config_from_env() -> ElasticsearchConfig: - """Setup Elasticsearch config from environment variables.""" - url = os.getenv("ELASTICSEARCH_URL") - api_key = os.getenv("ELASTICSEARCH_API_KEY") - index_name = os.getenv("ELASTICSEARCH_INDEX_NAME") - - if url is None: - raise ValueError("ELASTICSEARCH_URL must be set") - if api_key is None: - raise ValueError("ELASTICSEARCH_API_KEY must be set") - if index_name is None: - raise ValueError("ELASTICSEARCH_INDEX_NAME must be set") - return ElasticsearchConfig( - url=url, - api_key=api_key, - index_name=index_name, - ) - - class RemoteRolloutProcessor(RolloutProcessor): """ Rollout processor that triggers a remote HTTP server to perform the rollout. @@ -59,8 +39,6 @@ def __init__( poll_interval: float = 1.0, timeout_seconds: float = 120.0, output_data_loader: Optional[Callable[[DataLoaderConfig], DynamicDataLoader]] = None, - disable_elastic_search_setup: bool = False, - elastic_search_config: Optional[ElasticsearchConfig] = None, ): # Prefer constructor-provided configuration. These can be overridden via # config.kwargs at call time for backward compatibility. @@ -74,21 +52,7 @@ def __init__( self._poll_interval = poll_interval self._timeout_seconds = timeout_seconds self._output_data_loader = output_data_loader or default_fireworks_output_data_loader - self._disable_elastic_search_setup = disable_elastic_search_setup - self._elastic_search_config = elastic_search_config - - def setup(self) -> None: - if self._disable_elastic_search_setup: - logger.info("Elasticsearch is disabled, skipping setup") - return - logger.info("Setting up Elasticsearch") - self._elastic_search_config = self._setup_elastic_search() - logger.info("Elasticsearch setup complete") - - def _setup_elastic_search(self) -> ElasticsearchConfig: - """Set up Elasticsearch using the dedicated setup module.""" - setup = ElasticsearchSetup() - return setup.setup_elasticsearch() + self._tracing_adapter = FireworksTracingAdapter(base_url=self._model_base_url) def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: tasks: List[asyncio.Task[EvaluationRow]] = [] @@ -123,7 +87,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow: if row.input_metadata.row_id is None: raise ValueError("Row ID is required in RemoteRolloutProcessor") - init_payload = build_init_request(row, config, model_base_url, self._elastic_search_config) + init_payload = build_init_request(row, config, model_base_url) # Fire-and-poll def _post_init() -> None: @@ -153,10 +117,6 @@ def _get_status() -> Dict[str, Any]: r.raise_for_status() return r.json() - elasticsearch_client = ( - ElasticsearchClient(self._elastic_search_config) if self._elastic_search_config else None - ) - continue_polling_status = True while time.time() < deadline: try: @@ -178,29 +138,41 @@ def _get_status() -> Dict[str, Any]: # For all other exceptions, raise them raise - if not elasticsearch_client: - continue - - search_results = elasticsearch_client.search_by_status_code_not_in( - row.execution_metadata.rollout_id, [Status.Code.RUNNING] + # Search Fireworks tracing logs for completion + completed_logs = self._tracing_adapter.search_logs( + tags=[f"rollout_id:{row.execution_metadata.rollout_id}"] ) - hits = search_results["hits"]["hits"] if search_results else [] + # Filter for logs that actually have status information + status_logs = [] + for log in completed_logs: + status_dict = log.get("status") + if status_dict and isinstance(status_dict, dict) and "code" in status_dict: + status_logs.append(log) + + if status_logs: + # Use the first log with status information + status_log = status_logs[0] + status_dict = status_log.get("status") + + logger.info( + f"Found status log for rollout {row.execution_metadata.rollout_id}: {status_log.get('message', '')}" + ) - if hits: - # log all statuses found and update rollout status from the last hit - for hit in hits: - document = hit["_source"] - logger.info( - f"Found log for rollout {row.execution_metadata.rollout_id} with status code {document['status_code']}" - ) - # Update rollout status from the document - if "status_code" in document: - row.rollout_status = Status( - code=Status.Code(document["status_code"]), - message=document.get("status_message", ""), - details=document.get("status_details", []), - ) - logger.info("Stopping status polling for rollout %s", row.execution_metadata.rollout_id) + status_code = status_dict.get("code") + status_message = status_dict.get("message", "") + status_details = status_dict.get("details", []) + + logger.info( + f"Found Fireworks log for rollout {row.execution_metadata.rollout_id} with status code {status_code}" + ) + + row.rollout_status = Status( + code=Status.Code(status_code), + message=status_message, + details=status_details, + ) + + logger.info("Stopping polling for rollout %s", row.execution_metadata.rollout_id) break await asyncio.sleep(poll_interval) diff --git a/eval_protocol/pytest/tracing_utils.py b/eval_protocol/pytest/tracing_utils.py index 14ac03cf..de05ff79 100644 --- a/eval_protocol/pytest/tracing_utils.py +++ b/eval_protocol/pytest/tracing_utils.py @@ -56,7 +56,6 @@ def build_init_request( row: EvaluationRow, config: RolloutProcessorConfig, model_base_url: str, - elastic_search_config: Optional[Any] = None, ) -> InitRequest: """Build an InitRequest from an EvaluationRow and config (shared logic).""" # Validation @@ -129,7 +128,6 @@ def build_init_request( tools=row.tools, metadata=meta, model_base_url=final_model_base_url, - elastic_search_config=elastic_search_config, ) diff --git a/tests/logging/test_elasticsearch_direct_http_handler.py b/tests/logging/test_elasticsearch_direct_http_handler.py index df59f9de..9098b088 100644 --- a/tests/logging/test_elasticsearch_direct_http_handler.py +++ b/tests/logging/test_elasticsearch_direct_http_handler.py @@ -2,6 +2,7 @@ import logging import time import pytest +import warnings from datetime import datetime, timezone from eval_protocol.log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler @@ -9,6 +10,17 @@ from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup from eval_protocol.types.remote_rollout_processor import ElasticsearchConfig +# DEPRECATION WARNING: These Elasticsearch integration tests are deprecated +# in favor of Fireworks tracing integration tests. See: +# tests/logging/test_fireworks_tracing_integration.py +warnings.warn( + "Elasticsearch integration tests are deprecated. " + "Use Fireworks tracing integration tests instead: " + "tests/logging/test_fireworks_tracing_integration.py", + DeprecationWarning, + stacklevel=2, +) + @pytest.fixture def rollout_id(): @@ -110,6 +122,7 @@ def clear_elasticsearch_before_test( print(f"Warning: Failed to clear Elasticsearch index before test: {e}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_sends_logs( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -159,6 +172,7 @@ def test_elasticsearch_direct_http_handler_sends_logs( print(f"Successfully verified log message in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -212,6 +226,7 @@ def test_elasticsearch_direct_http_handler_sorts_logs_chronologically( print(f"Timestamps in order: {found_timestamps}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_includes_rollout_id( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -269,6 +284,7 @@ def test_elasticsearch_direct_http_handler_includes_rollout_id( print(f"Successfully verified log message with rollout_id '{rollout_id}' in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_search_by_rollout_id( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -337,6 +353,7 @@ def test_elasticsearch_direct_http_handler_search_by_rollout_id( print("Verified that search for different rollout_id returns 0 results") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_logs_status_info( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -402,6 +419,7 @@ def test_elasticsearch_direct_http_handler_logs_status_info( print(f"Successfully verified Status logging with code {test_status.code.value} in Elasticsearch: {test_message}") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_search_by_status_code( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -460,6 +478,7 @@ def test_elasticsearch_direct_http_handler_search_by_status_code( print(f"Successfully verified search by status code {running_status.value} found {len(hits)} log messages") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_rollout_id_from_extra_overrides_env( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): @@ -539,6 +558,7 @@ def test_elasticsearch_direct_http_handler_rollout_id_from_extra_overrides_env( print(f"Successfully verified rollout_id override: extra '{extra_rollout_id}' overrode environment '{rollout_id}'") +@pytest.mark.skip(reason="Deprecated: Use Fireworks tracing integration tests instead") def test_elasticsearch_direct_http_handler_timestamp_format( elasticsearch_client: ElasticsearchClient, test_logger: logging.Logger, rollout_id: str ): diff --git a/tests/remote_server/remote_server.py b/tests/remote_server/remote_server.py index ed1ee08d..85d69ed5 100644 --- a/tests/remote_server/remote_server.py +++ b/tests/remote_server/remote_server.py @@ -8,14 +8,14 @@ from openai import OpenAI import logging -from eval_protocol import Status, InitRequest, ElasticsearchDirectHttpHandler, RolloutIdFilter +from eval_protocol import Status, InitRequest, FireworksTracingHttpHandler, RolloutIdFilter app = FastAPI() -# attach handler to root logger -handler = ElasticsearchDirectHttpHandler() -logging.getLogger().addHandler(handler) +# Attach Fireworks tracing handler to root logger +fireworks_handler = FireworksTracingHttpHandler() +logging.getLogger().addHandler(fireworks_handler) force_early_error_message = None @@ -23,10 +23,7 @@ @app.post("/init") def init(req: InitRequest): - if req.elastic_search_config: - handler.configure(req.elastic_search_config) - - # attach rollout_id filter to logger + # Attach rollout_id filter to logger logger = logging.getLogger(f"{__name__}.{req.metadata.rollout_id}") logger.addFilter(RolloutIdFilter(req.metadata.rollout_id)) diff --git a/tests/remote_server/remote_server_multi_turn.py b/tests/remote_server/remote_server_multi_turn.py index 69dabc15..3a9dc60d 100644 --- a/tests/remote_server/remote_server_multi_turn.py +++ b/tests/remote_server/remote_server_multi_turn.py @@ -7,22 +7,19 @@ from openai import OpenAI import logging -from eval_protocol import Status, InitRequest, ElasticsearchDirectHttpHandler, RolloutIdFilter +from eval_protocol import Status, InitRequest, FireworksTracingHttpHandler, RolloutIdFilter app = FastAPI() -# attach handler to root logger -handler = ElasticsearchDirectHttpHandler() -logging.getLogger().addHandler(handler) +# Attach Fireworks tracing handler to root logger +fireworks_handler = FireworksTracingHttpHandler() +logging.getLogger().addHandler(fireworks_handler) @app.post("/init") def init(req: InitRequest): - if req.elastic_search_config: - handler.configure(req.elastic_search_config) - - # attach rollout_id filter to logger + # Attach rollout_id filter to logger logger = logging.getLogger(f"{__name__}.{req.metadata.rollout_id}") logger.addFilter(RolloutIdFilter(req.metadata.rollout_id)) diff --git a/tests/remote_server/test_remote_fireworks.py b/tests/remote_server/test_remote_fireworks.py index ea133ccf..303d40f5 100644 --- a/tests/remote_server/test_remote_fireworks.py +++ b/tests/remote_server/test_remote_fireworks.py @@ -1,20 +1,13 @@ -# MANUAL SERVER STARTUP REQUIRED: -# -# For Python server testing, start: -# python -m tests.remote_server.remote_server (runs on http://127.0.0.1:3000) -# -# For TypeScript server testing, start: -# cd tests/remote_server/typescript-server -# npm install -# npm start -# -# The TypeScript server should be running on http://127.0.0.1:3000 -# You only need to start one of the servers! +# AUTO SERVER STARTUP: Server is automatically started and stopped by the test import os +import subprocess +import socket +import time from typing import List import pytest +import requests from eval_protocol.data_loader.dynamic_data_loader import DynamicDataLoader from eval_protocol.models import EvaluationRow, Message @@ -27,6 +20,54 @@ ROLLOUT_IDS = set() +def find_available_port() -> int: + """Find an available port on localhost""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + port = s.getsockname()[1] + return port + + +SERVER_PORT = find_available_port() + + +def wait_for_server_to_startup(timeout: int = 120): + start_time = time.time() + while True: + try: + requests.get(f"http://127.0.0.1:{SERVER_PORT}") + break + except requests.exceptions.RequestException: + time.sleep(1) + if time.time() - start_time > timeout: + raise TimeoutError(f"Server did not start within {timeout} seconds") + + +@pytest.fixture(autouse=True) +def setup_remote_server(): + """Start the remote server""" + # kill all Python processes matching "python -m tests.remote_server.remote_server" + subprocess.run(["pkill", "-f", "python -m tests.remote_server.remote_server"], capture_output=True) + + host = "127.0.0.1" + process = subprocess.Popen( + [ + "python", + "-m", + "tests.remote_server.remote_server", + "--host", + host, + "--port", + str(SERVER_PORT), + ] + ) + # wait for the server to startup by polling + wait_for_server_to_startup() + yield + process.terminate() + process.wait() + + @pytest.fixture(autouse=True) def check_rollout_coverage(): """Ensure we processed all expected rollout_ids""" @@ -57,14 +98,13 @@ def rows() -> List[EvaluationRow]: return [row, row, row] -@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this test locally (skipped in CI)") @pytest.mark.parametrize("completion_params", [{"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b"}]) @evaluation_test( data_loaders=DynamicDataLoader( generators=[rows], ), rollout_processor=RemoteRolloutProcessor( - remote_base_url="http://127.0.0.1:3000", + remote_base_url=f"http://127.0.0.1:{SERVER_PORT}", timeout_seconds=180, output_data_loader=fireworks_output_data_loader, ), @@ -72,7 +112,7 @@ def rows() -> List[EvaluationRow]: async def test_remote_rollout_and_fetch_fireworks(row: EvaluationRow) -> EvaluationRow: """ End-to-end test: - - REQUIRES MANUAL SERVER STARTUP: python -m tests.remote_server.remote_server + - AUTO SERVER STARTUP: Server is automatically started and stopped by the test - trigger remote rollout via RemoteRolloutProcessor (calls init/status) - fetch traces from Langfuse via Fireworks tracing proxy filtered by metadata via output_data_loader; FAIL if none found """