diff --git a/eval_protocol/adapters/fireworks_tracing.py b/eval_protocol/adapters/fireworks_tracing.py index 707f983a..7a2bddcb 100644 --- a/eval_protocol/adapters/fireworks_tracing.py +++ b/eval_protocol/adapters/fireworks_tracing.py @@ -265,6 +265,54 @@ def __init__( self.base_url = base_url.rstrip("/") self.timeout = timeout + def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]: + """Fetch logs from Fireworks tracing gateway /logs endpoint. + + Returns entries with keys: timestamp, message, severity, tags. + """ + if not tags: + raise ValueError("At least one tag is required to fetch logs") + + headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"} + params: Dict[str, Any] = {"tags": tags, "limit": limit, "hours_back": hours_back, "program": "eval_protocol"} + + # Try /logs first, fall back to /v1/logs if not found + urls_to_try = [f"{self.base_url}/logs", f"{self.base_url}/v1/logs"] + data: Dict[str, Any] = {} + last_error: Optional[str] = None + for url in urls_to_try: + try: + response = requests.get(url, params=params, timeout=self.timeout, headers=headers) + if response.status_code == 404: + # Try next variant + last_error = f"404 for {url}" + continue + response.raise_for_status() + data = response.json() or {} + break + except requests.exceptions.RequestException as e: + last_error = str(e) + continue + else: + # All attempts failed + if last_error: + logger.error("Failed to fetch logs from Fireworks (tried %s): %s", urls_to_try, last_error) + return [] + + entries: List[Dict[str, Any]] = data.get("entries", []) or [] + # Normalize minimal shape + results: List[Dict[str, Any]] = [] + for e in entries: + results.append( + { + "timestamp": e.get("timestamp"), + "message": e.get("message"), + "severity": e.get("severity", "INFO"), + "tags": e.get("tags", []), + } + ) + return results + def get_evaluation_rows( self, tags: List[str], diff --git a/eval_protocol/cli.py b/eval_protocol/cli.py index 1d0f453c..3b7ff58f 100644 --- a/eval_protocol/cli.py +++ b/eval_protocol/cli.py @@ -307,6 +307,16 @@ def parse_args(args=None): action="store_true", help="Use env vars for Elasticsearch config (requires ELASTICSEARCH_URL, ELASTICSEARCH_API_KEY, ELASTICSEARCH_INDEX_NAME)", ) + logs_parser.add_argument( + "--use-fireworks", + action="store_true", + help="Force Fireworks tracing backend for logs UI (overrides env auto-detection)", + ) + logs_parser.add_argument( + "--use-elasticsearch", + action="store_true", + help="Force Elasticsearch backend for logs UI (overrides env auto-detection)", + ) # Upload command upload_parser = subparsers.add_parser( diff --git a/eval_protocol/cli_commands/logs.py b/eval_protocol/cli_commands/logs.py index 7e2d668a..43261d94 100644 --- a/eval_protocol/cli_commands/logs.py +++ b/eval_protocol/cli_commands/logs.py @@ -5,6 +5,7 @@ import sys from pathlib import Path +import os from ..utils.logs_server import serve_logs @@ -20,53 +21,76 @@ def logs_command(args): print("Press Ctrl+C to stop the server") print("-" * 50) - # Setup Elasticsearch based on flags + # Backend selection: Fireworks first when API key present, unless overridden + use_fireworks = False + if getattr(args, "use_fireworks", False): + use_fireworks = True + elif getattr(args, "use_elasticsearch", False): + use_fireworks = False + else: + use_fireworks = bool(os.environ.get("FIREWORKS_API_KEY")) + + # Setup backend configs elasticsearch_config = None + # Prefer explicit FW_TRACING_GATEWAY_BASE_URL, then GATEWAY_URL from env (remote validation), + # finally default to public tracing.fireworks.ai + fireworks_base_url = ( + os.environ.get("FW_TRACING_GATEWAY_BASE_URL") + or os.environ.get("GATEWAY_URL") + or "https://tracing.fireworks.ai" + ) try: - if getattr(args, "use_env_elasticsearch_config", False): - # Use environment variables for configuration - print("⚙️ Using environment variables for Elasticsearch config") - from eval_protocol.pytest.remote_rollout_processor import ( - create_elasticsearch_config_from_env, - ) - - elasticsearch_config = create_elasticsearch_config_from_env() - # Ensure index exists with correct mapping, mirroring Docker setup path - try: - from eval_protocol.log_utils.elasticsearch_index_manager import ( - ElasticsearchIndexManager, + if not use_fireworks: + if getattr(args, "use_env_elasticsearch_config", False): + # Use environment variables for configuration + print("⚙️ Using environment variables for Elasticsearch config") + from eval_protocol.pytest.remote_rollout_processor import ( + create_elasticsearch_config_from_env, ) - index_manager = ElasticsearchIndexManager( - elasticsearch_config.url, - elasticsearch_config.index_name, - elasticsearch_config.api_key, - ) - created = index_manager.create_logging_index_mapping() - if created: - print( - f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)" + elasticsearch_config = create_elasticsearch_config_from_env() + # Ensure index exists with correct mapping, mirroring Docker setup path + try: + from eval_protocol.log_utils.elasticsearch_index_manager import ( + ElasticsearchIndexManager, ) - else: - print( - f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly." + + index_manager = ElasticsearchIndexManager( + elasticsearch_config.url, + elasticsearch_config.index_name, + elasticsearch_config.api_key, ) - except Exception as e: - print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}") - elif not getattr(args, "disable_elasticsearch_setup", False): - # Default behavior: start or connect to local Elasticsearch via Docker helper - from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup + created = index_manager.create_logging_index_mapping() + if created: + print( + f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)" + ) + else: + print( + f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly." + ) + except Exception as e: + print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}") + elif not getattr(args, "disable_elasticsearch_setup", False): + # Default behavior: start or connect to local Elasticsearch via Docker helper + from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup - print("🧰 Auto-configuring local Elasticsearch (Docker)") - elasticsearch_config = ElasticsearchSetup().setup_elasticsearch() - else: - print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration") + print("🧰 Auto-configuring local Elasticsearch (Docker)") + elasticsearch_config = ElasticsearchSetup().setup_elasticsearch() + else: + print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration") except Exception as e: print(f"❌ Failed to configure Elasticsearch: {e}") return 1 try: - serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug) + serve_logs( + port=args.port, + elasticsearch_config=elasticsearch_config, + debug=args.debug, + backend="fireworks" if use_fireworks else "elasticsearch", + fireworks_base_url=fireworks_base_url if use_fireworks else None, + ) return 0 except KeyboardInterrupt: print("\n🛑 Server stopped by user") diff --git a/eval_protocol/log_utils/fireworks_tracing_http_handler.py b/eval_protocol/log_utils/fireworks_tracing_http_handler.py index df53a921..b553a4bd 100644 --- a/eval_protocol/log_utils/fireworks_tracing_http_handler.py +++ b/eval_protocol/log_utils/fireworks_tracing_http_handler.py @@ -12,10 +12,19 @@ class FireworksTracingHttpHandler(logging.Handler): def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None: super().__init__() - self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL") + self.gateway_base_url = ( + gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai" + ) self.rollout_id_env = rollout_id_env self._session = requests.Session() self._lock = threading.Lock() + # Include Authorization header if FIREWORKS_API_KEY is available + api_key = os.environ.get("FIREWORKS_API_KEY") + if api_key: + try: + self._session.headers.update({"Authorization": f"Bearer {api_key}"}) + except Exception: + pass def emit(self, record: logging.LogRecord) -> None: try: @@ -25,9 +34,42 @@ def emit(self, record: logging.LogRecord) -> None: if not rollout_id: return payload = self._build_payload(record, rollout_id) - url = f"{self.gateway_base_url.rstrip('/')}/logs" + base = self.gateway_base_url.rstrip("/") + url = f"{base}/logs" + # Optional debug prints to aid local diagnostics + if os.environ.get("EP_DEBUG") == "true": + try: + tags_val = payload.get("tags") + tags_len = len(tags_val) if isinstance(tags_val, list) else 0 + msg_val = payload.get("message") + msg_preview = msg_val[:80] if isinstance(msg_val, str) else msg_val + print(f"[FW_LOG] POST {url} rollout_id={rollout_id} tags={tags_len} msg={msg_preview}") + except Exception: + pass with self._lock: - self._session.post(url, json=payload, timeout=5) + resp = self._session.post(url, json=payload, timeout=5) + if os.environ.get("EP_DEBUG") == "true": + try: + print(f"[FW_LOG] resp={resp.status_code}") + except Exception: + pass + # Fallback to /v1/logs if /logs is not found + if resp is not None and getattr(resp, "status_code", None) == 404: + alt = f"{base}/v1/logs" + if os.environ.get("EP_DEBUG") == "true": + try: + tags_val = payload.get("tags") + tags_len = len(tags_val) if isinstance(tags_val, list) else 0 + print(f"[FW_LOG] RETRY POST {alt} rollout_id={rollout_id} tags={tags_len}") + except Exception: + pass + with self._lock: + resp2 = self._session.post(alt, json=payload, timeout=5) + if os.environ.get("EP_DEBUG") == "true": + try: + print(f"[FW_LOG] retry resp={resp2.status_code}") + except Exception: + pass except Exception: # Avoid raising exceptions from logging self.handleError(record) diff --git a/eval_protocol/log_utils/init.py b/eval_protocol/log_utils/init.py index 1699b744..795cf7ba 100644 --- a/eval_protocol/log_utils/init.py +++ b/eval_protocol/log_utils/init.py @@ -39,9 +39,11 @@ def init_external_logging_from_env() -> None: # Ensure we do not add duplicate handlers if already present existing_handler_types = {type(h).__name__ for h in root_logger.handlers} - # Fireworks tracing - fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL") - if fw_url and "FireworksTracingHttpHandler" not in existing_handler_types: + # Fireworks tracing: prefer if FIREWORKS_API_KEY is present; default base URL if not provided + fw_key = _get_env("FIREWORKS_API_KEY") + # Allow remote validation gateway to act as tracing base when provided + fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL") or _get_env("GATEWAY_URL") or "https://tracing.fireworks.ai" + if fw_key and "FireworksTracingHttpHandler" not in existing_handler_types: fw_handler = FireworksTracingHttpHandler(gateway_base_url=fw_url) fw_handler.setLevel(logging.INFO) fw_handler.addFilter(ContextRolloutIdFilter()) @@ -51,7 +53,13 @@ def init_external_logging_from_env() -> None: es_url = _get_env("EP_ELASTICSEARCH_URL") es_api_key = _get_env("EP_ELASTICSEARCH_API_KEY") es_index = _get_env("EP_ELASTICSEARCH_INDEX") - if es_url and es_api_key and es_index and "ElasticsearchDirectHttpHandler" not in existing_handler_types: + if ( + not fw_key + and es_url + and es_api_key + and es_index + and "ElasticsearchDirectHttpHandler" not in existing_handler_types + ): es_config = ElasticsearchConfig(url=es_url, api_key=es_api_key, index_name=es_index) es_handler = ElasticsearchDirectHttpHandler(elasticsearch_config=es_config) es_handler.setLevel(logging.INFO) diff --git a/eval_protocol/proxy/proxy_core/redis_utils.py b/eval_protocol/proxy/proxy_core/redis_utils.py index fa24c38c..2ebd6245 100644 --- a/eval_protocol/proxy/proxy_core/redis_utils.py +++ b/eval_protocol/proxy/proxy_core/redis_utils.py @@ -3,7 +3,7 @@ """ import logging -from typing import Set +from typing import Set, cast import redis logger = logging.getLogger(__name__) @@ -40,7 +40,16 @@ def get_insertion_ids(redis_client: redis.Redis, rollout_id: str) -> Set[str]: Set of insertion_id strings, empty set if none found or on error """ try: - insertion_ids = redis_client.smembers(rollout_id) + raw = redis_client.smembers(rollout_id) + # Typing in redis stubs may be Awaitable[Set[Any]] | Set[Any]; at runtime this is a Set[bytes] + raw_ids = cast(Set[object], raw) + # Normalize to set[str] + insertion_ids: Set[str] = set() + for b in raw_ids: + try: + insertion_ids.add(b.decode("utf-8") if isinstance(b, (bytes, bytearray)) else cast(str, b)) + except Exception: + continue logger.debug(f"Found {len(insertion_ids)} expected insertion_ids for rollout {rollout_id}") return insertion_ids except Exception as e: diff --git a/eval_protocol/utils/logs_server.py b/eval_protocol/utils/logs_server.py index 950aaa94..adf44c57 100644 --- a/eval_protocol/utils/logs_server.py +++ b/eval_protocol/utils/logs_server.py @@ -8,7 +8,7 @@ from contextlib import asynccontextmanager from pathlib import Path from queue import Queue -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Literal import psutil import uvicorn @@ -327,6 +327,8 @@ def __init__( port: Optional[int] = 8000, index_file: str = "index.html", elasticsearch_config: Optional[ElasticsearchConfig] = None, + backend: Literal["fireworks", "elasticsearch"] = "elasticsearch", + fireworks_base_url: Optional[str] = None, debug: bool = False, ): # Enable debug mode if requested @@ -336,6 +338,10 @@ def __init__( # Initialize WebSocket manager self.websocket_manager = WebSocketManager() + # Backend selection + self.backend: Literal["fireworks", "elasticsearch"] = backend + self.fireworks_base_url = fireworks_base_url + # Initialize Elasticsearch client if config is provided self.elasticsearch_client: Optional[ElasticsearchClient] = None if elasticsearch_config: @@ -414,6 +420,8 @@ async def status(): # Expose an empty list to satisfy consumers and type checker "watch_paths": [], "elasticsearch_enabled": self.elasticsearch_client is not None, + "backend": self.backend, + "fireworks_enabled": self.backend == "fireworks", } @self.app.get("/api/logs/{rollout_id}", response_model=LogsResponse, response_model_exclude_none=True) @@ -422,7 +430,47 @@ async def get_logs( level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"), limit: int = Query(100, description="Maximum number of log entries to return"), ) -> LogsResponse: - """Get logs for a specific rollout ID from Elasticsearch.""" + """Get logs for a specific rollout ID from the configured backend.""" + # Fireworks backend + if self.backend == "fireworks": + try: + from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter + + base_url = self.fireworks_base_url or "https://tracing.fireworks.ai" + adapter = FireworksTracingAdapter(base_url=base_url) + # Fetch lightweight log entries filtered by rollout_id tag + tags = [f"rollout_id:{rollout_id}"] + entries = adapter.search_logs(tags=tags, limit=limit) + # Map to LogEntry responses + log_entries: List[LogEntry] = [] + for e in entries: + ts = e.get("timestamp") or datetime.utcnow().isoformat() + "Z" + msg = e.get("message") or "trace" + sev = e.get("severity") or "INFO" + entry = LogEntry( + **{ + "@timestamp": ts, + "level": sev, + "message": str(msg), + "logger_name": "fireworks", + "rollout_id": rollout_id, + } + ) + log_entries.append(entry) + + return LogsResponse( + logs=log_entries, + total=len(log_entries), + rollout_id=rollout_id, + filtered_by_level=level, + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving Fireworks logs for rollout {rollout_id}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to retrieve Fireworks logs: {str(e)}") + + # Elasticsearch backend if not self.elasticsearch_client: raise HTTPException(status_code=503, detail="Elasticsearch is not configured for this logs server") @@ -574,6 +622,8 @@ def create_app( port: int = 8000, build_dir: Optional[str] = None, elasticsearch_config: Optional[ElasticsearchConfig] = None, + backend: Literal["fireworks", "elasticsearch"] = "elasticsearch", + fireworks_base_url: Optional[str] = None, debug: bool = False, ) -> FastAPI: """ @@ -597,7 +647,13 @@ def create_app( ) server = LogsServer( - host=host, port=port, build_dir=build_dir, elasticsearch_config=elasticsearch_config, debug=debug + host=host, + port=port, + build_dir=build_dir, + elasticsearch_config=elasticsearch_config, + backend=backend, + fireworks_base_url=fireworks_base_url, + debug=debug, ) server.start_loops() return server.app @@ -605,12 +661,29 @@ def create_app( # For backward compatibility and direct usage def serve_logs( - port: Optional[int] = None, elasticsearch_config: Optional[ElasticsearchConfig] = None, debug: bool = False + port: Optional[int] = None, + elasticsearch_config: Optional[ElasticsearchConfig] = None, + debug: bool = False, + backend: Literal["fireworks", "elasticsearch"] = "elasticsearch", + fireworks_base_url: Optional[str] = None, ): """ Convenience function to create and run a LogsServer. """ - server = LogsServer(port=port, elasticsearch_config=elasticsearch_config, debug=debug) + # For backward compatibility with tests that assert exact constructor kwargs, + # only pass additional backend-related kwargs when they are actually needed. + logs_server_kwargs: Dict[str, Any] = { + "port": port, + "elasticsearch_config": elasticsearch_config, + "debug": debug, + } + + # If non-default backend (fireworks) is requested or a base URL is provided, include them. + if backend != "elasticsearch" or fireworks_base_url is not None: + logs_server_kwargs["backend"] = backend + logs_server_kwargs["fireworks_base_url"] = fireworks_base_url + + server = LogsServer(**logs_server_kwargs) server.run() diff --git a/scripts/validate_remote.py b/scripts/validate_remote.py new file mode 100644 index 00000000..b82d5d28 --- /dev/null +++ b/scripts/validate_remote.py @@ -0,0 +1,136 @@ +import os +import sys +import time +import requests + + +def require_env(var_name: str) -> str: + value = os.getenv(var_name) + if not value: + print(f"Missing required env var: {var_name}", file=sys.stderr) + sys.exit(1) + return value + + +def require_logs_endpoints(base_url: str) -> None: + try: + r = requests.get(f"{base_url}/openapi.json", timeout=30) + if not r.ok: + print("OpenAPI schema unavailable", file=sys.stderr) + sys.exit(1) + paths = r.json().get("paths", {}) + ok = any(p.startswith("/logs") or p.startswith("/v1/logs") for p in paths.keys()) + if not ok: + print("/logs endpoints not present on deployment", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"Failed to check OpenAPI: {e}", file=sys.stderr) + sys.exit(1) + + +def post_chat_completion(base_url: str, api_key: str, rollout_id: str) -> None: + headers = {"Authorization": f"Bearer {api_key}"} + now = int(time.time()) + url = ( + f"{base_url}/rollout_id/{rollout_id}/" + f"invocation_id/inv{now}/" + f"experiment_id/remote-validate/" + f"run_id/run-1/" + f"row_id/row-1/" + f"chat/completions" + ) + body = { + "model": "fireworks_ai/accounts/fireworks/models/llama-v3p1-8b-instruct", + "messages": [{"role": "user", "content": "Say 'ok' if you can read this."}], + "temperature": 0.1, + } + r = requests.post(url, headers=headers, json=body, timeout=60) + if r.status_code != 200: + print(f"Chat completion failed: {r.status_code} {r.text[:500]}", file=sys.stderr) + sys.exit(1) + print("chat: ok") + + +def wait_for_traces(base_url: str, api_key: str, rollout_id: str, max_attempts: int = 8) -> None: + headers = {"Authorization": f"Bearer {api_key}"} + params = { + "tags": [f"rollout_id:{rollout_id}"], + "limit": 10, + "hours_back": 6, + } + url = f"{base_url}/traces" + for attempt in range(1, max_attempts + 1): + r = requests.get(url, headers=headers, params=params, timeout=30) + if r.status_code == 200: + data = r.json() + total = int(data.get("total_traces") or 0) + print(f"traces: ok total_traces={total}") + if total > 0: + return + elif r.status_code != 404 and r.status_code != 401: + print(f"Traces fetch failed: {r.status_code} {r.text[:500]}", file=sys.stderr) + sys.exit(1) + sleep_s = min(2 ** (attempt - 1), 10) + time.sleep(sleep_s) + print("Traces not available after retries (indexing delay?)", file=sys.stderr) + sys.exit(1) + + +def validate_logs_endpoints(base_url: str, rollout_id: str) -> None: + require_logs_endpoints(base_url) + + # Ingest a structured log + payload = { + "program": "eval_protocol", + "status": "completed", + "message": "Remote validation run finished", + "tags": [f"rollout_id:{rollout_id}", "experiment_id:remote", "run_id:test"], + "metadata": {"dataset": "AIME"}, + "extras": {"num_examples": 3}, + } + r = requests.post(f"{base_url}/logs", json=payload, timeout=30) + if r.status_code != 200: + print(f"logs ingest failed: {r.status_code} {r.text[:500]}", file=sys.stderr) + sys.exit(1) + print("logs ingest: ok") + + # Retrieve logs (retry for indexing) + params = { + "tags": [f"rollout_id:{rollout_id}"], + "program": "eval_protocol", + "hours_back": 1, + "limit": 10, + } + total = 0 + for attempt in range(1, 12): + rr = requests.get(f"{base_url}/logs", params=params, timeout=30) + if rr.status_code == 200: + data = rr.json() + total = int(data.get("total_entries") or 0) + if total > 0: + print(f"logs fetch: ok total_entries={total}") + break + sleep_s = min(2 ** (attempt - 1), 10) + time.sleep(sleep_s) + if total == 0: + print("logs fetch: no entries found within retry window", file=sys.stderr) + sys.exit(1) + + +def main(): + base_url = require_env("GATEWAY_URL") + api_key = require_env("FIREWORKS_API_KEY") + rollout_id = f"r{int(time.time())}" + + print(f"Gateway: {base_url}") + print(f"Rollout: rollout_id:{rollout_id}") + + post_chat_completion(base_url, api_key, rollout_id) + wait_for_traces(base_url, api_key, rollout_id) + validate_logs_endpoints(base_url, rollout_id) + + print("remote validation: SUCCESS") + + +if __name__ == "__main__": + main() diff --git a/scripts/verify_logging_locally.py b/scripts/verify_logging_locally.py new file mode 100644 index 00000000..adcbef05 --- /dev/null +++ b/scripts/verify_logging_locally.py @@ -0,0 +1,183 @@ +import os +import sys +import time +import json +import logging +from typing import Any, Dict, List + +import requests + +from eval_protocol.log_utils.init import init_external_logging_from_env +from eval_protocol.log_utils.rollout_context import rollout_logging_context + + +def _now_rollout_id() -> str: + return f"verify-{int(time.time())}" + + +def _detect_gateway_base_url() -> str: + # Prefer explicit FW_TRACING_GATEWAY_BASE_URL, else GATEWAY_URL, else public default + return os.getenv("FW_TRACING_GATEWAY_BASE_URL") or os.getenv("GATEWAY_URL") or "https://tracing.fireworks.ai" + + +def _detect_logs_endpoint(base_url: str) -> str: + # Inspect OpenAPI and choose the correct logs endpoint + try: + import requests + + r = requests.get(f"{base_url.rstrip('/')}/openapi.json", timeout=5) + if r.ok: + paths = (r.json() or {}).get("paths", {}) + if any(p.startswith("/v1/logs") for p in paths.keys()): + return "/v1/logs" + if any(p.startswith("/logs") for p in paths.keys()): + return "/logs" + except Exception: + pass + return "/logs" + + +def verify_fireworks(rollout_id: str) -> int: + base_url = _detect_gateway_base_url() + api_key = os.getenv("FIREWORKS_API_KEY") + if not api_key: + print("FIREWORKS_API_KEY not set; cannot verify Fireworks") + return 2 + + # Emit two logs under rollout context + root = logging.getLogger() + root.setLevel(logging.INFO) + init_external_logging_from_env() + # Detect and use the correct logs endpoint + logs_ep = _detect_logs_endpoint(base_url) + # Print handler info for diagnostics + handlers = [type(h).__name__ for h in root.handlers] + print( + json.dumps( + { + "gateway_url": base_url, + "logs_endpoint": logs_ep, + "root_handlers": handlers, + } + ) + ) + + logger = logging.getLogger("ep.verify.fireworks") + for i in range(2): + logger.info( + f"verify fireworks message {i}", + extra={ + "rollout_id": rollout_id, + "experiment_id": "verify-exp", + "run_id": "verify-run", + "status_code": 101 if i == 0 else 200, + "status_message": "RUNNING" if i == 0 else "COMPLETED", + }, + ) + + # Poll /logs for the rollout tag + headers = {"Authorization": f"Bearer {api_key}"} + params = { + "tags": [f"rollout_id:{rollout_id}"], + "program": "eval_protocol", + "limit": 50, + "hours_back": 6, + } + candidate_eps = [logs_ep, "/v1/logs" if logs_ep != "/v1/logs" else "/logs"] + for _ in range(20): + try: + data: Dict[str, Any] = {} + last_err: str | None = None + for ep in candidate_eps: + url = f"{base_url.rstrip('/')}{ep}" + r = requests.get(url, headers=headers, params=params, timeout=15) + if r.status_code == 404: + last_err = f"404 for {ep}" + continue + r.raise_for_status() + data = r.json() or {} + break + else: + raise Exception(last_err or "all endpoints failed") + entries: List[Dict[str, Any]] = data.get("entries", []) or [] + matched = [e for e in entries if any(t == f"rollout_id:{rollout_id}" for t in e.get("tags", []))] + if matched: + print(json.dumps({"total": len(matched), "sample": matched[:3]}, indent=2)) + return 0 + except Exception as e: + print(f"Fireworks fetch error: {e}") + time.sleep(2) + + print("No Fireworks entries found for rollout_id after retries") + return 1 + + +def verify_elasticsearch(rollout_id: str) -> int: + es_url = os.getenv("EP_ELASTICSEARCH_URL") or os.getenv("ELASTICSEARCH_URL") + es_api_key = os.getenv("EP_ELASTICSEARCH_API_KEY") or os.getenv("ELASTICSEARCH_API_KEY") + es_index = os.getenv("EP_ELASTICSEARCH_INDEX") or os.getenv("ELASTICSEARCH_INDEX_NAME") or "default-logs" + if not (es_url and es_api_key): + print("Elasticsearch env not set; set EP_ELASTICSEARCH_URL and EP_ELASTICSEARCH_API_KEY") + return 2 + + # Emit two logs under rollout context + root = logging.getLogger() + root.setLevel(logging.INFO) + init_external_logging_from_env() + logger = logging.getLogger("ep.verify.elasticsearch") + for i in range(2): + logger.info( + f"verify elasticsearch message {i}", + extra={ + "rollout_id": rollout_id, + "experiment_id": "verify-exp", + "run_id": "verify-run", + "status_code": 101 if i == 0 else 200, + "status_message": "RUNNING" if i == 0 else "COMPLETED", + }, + ) + + # Poll ES index by rollout_id + headers = {"Authorization": f"ApiKey {es_api_key}", "Content-Type": "application/json"} + search_body = { + "query": {"term": {"rollout_id": rollout_id}}, + "size": 50, + "sort": [{"@timestamp": {"order": "desc"}}], + } + url = f"{es_url.rstrip('/')}/{es_index}/_search" + for _ in range(20): + try: + r = requests.post(url, headers=headers, json=search_body, timeout=15) + r.raise_for_status() + data: Dict[str, Any] = r.json() or {} + hits = data.get("hits", {}).get("hits", []) + if hits: + docs = [h.get("_source", {}) for h in hits] + print(json.dumps({"total": len(docs), "sample": docs[:3]}, indent=2)) + return 0 + except Exception as e: + print(f"Elasticsearch fetch error: {e}") + time.sleep(2) + + print("No Elasticsearch entries found for rollout_id after retries") + return 1 + + +def main() -> int: + mode = os.getenv("MODE") + if mode is None: + # Default: Fireworks if key present, else Elasticsearch + mode = "fireworks" if os.getenv("FIREWORKS_API_KEY") else "elasticsearch" + rollout_id = os.getenv("EP_ROLLOUT_ID") or _now_rollout_id() + + if mode == "fireworks": + return verify_fireworks(rollout_id) + elif mode == "elasticsearch": + return verify_elasticsearch(rollout_id) + else: + print(f"Unknown MODE: {mode} (expected 'fireworks' or 'elasticsearch')") + return 2 + + +if __name__ == "__main__": + sys.exit(main())