Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions eval_protocol/adapters/fireworks_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,54 @@ def __init__(
self.base_url = base_url.rstrip("/")
self.timeout = timeout

def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
"""Fetch logs from Fireworks tracing gateway /logs endpoint.

Returns entries with keys: timestamp, message, severity, tags.
"""
if not tags:
raise ValueError("At least one tag is required to fetch logs")

headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"}
params: Dict[str, Any] = {"tags": tags, "limit": limit, "hours_back": hours_back, "program": "eval_protocol"}

# Try /logs first, fall back to /v1/logs if not found
urls_to_try = [f"{self.base_url}/logs", f"{self.base_url}/v1/logs"]
data: Dict[str, Any] = {}
last_error: Optional[str] = None
for url in urls_to_try:
try:
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
if response.status_code == 404:
# Try next variant
last_error = f"404 for {url}"
continue
response.raise_for_status()
data = response.json() or {}
break
except requests.exceptions.RequestException as e:
last_error = str(e)
continue
else:
# All attempts failed
if last_error:
logger.error("Failed to fetch logs from Fireworks (tried %s): %s", urls_to_try, last_error)
return []

entries: List[Dict[str, Any]] = data.get("entries", []) or []
# Normalize minimal shape
results: List[Dict[str, Any]] = []
for e in entries:
results.append(
{
"timestamp": e.get("timestamp"),
"message": e.get("message"),
"severity": e.get("severity", "INFO"),
"tags": e.get("tags", []),
}
)
return results

def get_evaluation_rows(
self,
tags: List[str],
Expand Down
10 changes: 10 additions & 0 deletions eval_protocol/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ def parse_args(args=None):
action="store_true",
help="Use env vars for Elasticsearch config (requires ELASTICSEARCH_URL, ELASTICSEARCH_API_KEY, ELASTICSEARCH_INDEX_NAME)",
)
logs_parser.add_argument(
"--use-fireworks",
action="store_true",
help="Force Fireworks tracing backend for logs UI (overrides env auto-detection)",
)
logs_parser.add_argument(
"--use-elasticsearch",
action="store_true",
help="Force Elasticsearch backend for logs UI (overrides env auto-detection)",
)

# Upload command
upload_parser = subparsers.add_parser(
Expand Down
94 changes: 59 additions & 35 deletions eval_protocol/cli_commands/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
from pathlib import Path

import os
from ..utils.logs_server import serve_logs


Expand All @@ -20,53 +21,76 @@ def logs_command(args):
print("Press Ctrl+C to stop the server")
print("-" * 50)

# Setup Elasticsearch based on flags
# Backend selection: Fireworks first when API key present, unless overridden
use_fireworks = False
if getattr(args, "use_fireworks", False):
use_fireworks = True
elif getattr(args, "use_elasticsearch", False):
use_fireworks = False
else:
use_fireworks = bool(os.environ.get("FIREWORKS_API_KEY"))

# Setup backend configs
elasticsearch_config = None
# Prefer explicit FW_TRACING_GATEWAY_BASE_URL, then GATEWAY_URL from env (remote validation),
# finally default to public tracing.fireworks.ai
fireworks_base_url = (
os.environ.get("FW_TRACING_GATEWAY_BASE_URL")
or os.environ.get("GATEWAY_URL")
or "https://tracing.fireworks.ai"
)
try:
if getattr(args, "use_env_elasticsearch_config", False):
# Use environment variables for configuration
print("⚙️ Using environment variables for Elasticsearch config")
from eval_protocol.pytest.remote_rollout_processor import (
create_elasticsearch_config_from_env,
)

elasticsearch_config = create_elasticsearch_config_from_env()
# Ensure index exists with correct mapping, mirroring Docker setup path
try:
from eval_protocol.log_utils.elasticsearch_index_manager import (
ElasticsearchIndexManager,
if not use_fireworks:
if getattr(args, "use_env_elasticsearch_config", False):
# Use environment variables for configuration
print("⚙️ Using environment variables for Elasticsearch config")
from eval_protocol.pytest.remote_rollout_processor import (
create_elasticsearch_config_from_env,
)

index_manager = ElasticsearchIndexManager(
elasticsearch_config.url,
elasticsearch_config.index_name,
elasticsearch_config.api_key,
)
created = index_manager.create_logging_index_mapping()
if created:
print(
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
elasticsearch_config = create_elasticsearch_config_from_env()
# Ensure index exists with correct mapping, mirroring Docker setup path
try:
from eval_protocol.log_utils.elasticsearch_index_manager import (
ElasticsearchIndexManager,
)
else:
print(
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."

index_manager = ElasticsearchIndexManager(
elasticsearch_config.url,
elasticsearch_config.index_name,
elasticsearch_config.api_key,
)
except Exception as e:
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
elif not getattr(args, "disable_elasticsearch_setup", False):
# Default behavior: start or connect to local Elasticsearch via Docker helper
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
created = index_manager.create_logging_index_mapping()
if created:
print(
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
)
else:
print(
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
)
except Exception as e:
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
elif not getattr(args, "disable_elasticsearch_setup", False):
# Default behavior: start or connect to local Elasticsearch via Docker helper
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup

print("🧰 Auto-configuring local Elasticsearch (Docker)")
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
else:
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
print("🧰 Auto-configuring local Elasticsearch (Docker)")
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
else:
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
except Exception as e:
print(f"❌ Failed to configure Elasticsearch: {e}")
return 1

try:
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
serve_logs(
port=args.port,
elasticsearch_config=elasticsearch_config,
debug=args.debug,
backend="fireworks" if use_fireworks else "elasticsearch",
fireworks_base_url=fireworks_base_url if use_fireworks else None,
)
return 0
except KeyboardInterrupt:
print("\n🛑 Server stopped by user")
Expand Down
48 changes: 45 additions & 3 deletions eval_protocol/log_utils/fireworks_tracing_http_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ class FireworksTracingHttpHandler(logging.Handler):

def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None:
super().__init__()
self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL")
self.gateway_base_url = (
gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai"
)
self.rollout_id_env = rollout_id_env
self._session = requests.Session()
self._lock = threading.Lock()
# Include Authorization header if FIREWORKS_API_KEY is available
api_key = os.environ.get("FIREWORKS_API_KEY")
if api_key:
try:
self._session.headers.update({"Authorization": f"Bearer {api_key}"})
except Exception:
pass

def emit(self, record: logging.LogRecord) -> None:
try:
Expand All @@ -25,9 +34,42 @@ def emit(self, record: logging.LogRecord) -> None:
if not rollout_id:
return
payload = self._build_payload(record, rollout_id)
url = f"{self.gateway_base_url.rstrip('/')}/logs"
base = self.gateway_base_url.rstrip("/")
url = f"{base}/logs"
# Optional debug prints to aid local diagnostics
if os.environ.get("EP_DEBUG") == "true":
try:
tags_val = payload.get("tags")
tags_len = len(tags_val) if isinstance(tags_val, list) else 0
msg_val = payload.get("message")
msg_preview = msg_val[:80] if isinstance(msg_val, str) else msg_val
print(f"[FW_LOG] POST {url} rollout_id={rollout_id} tags={tags_len} msg={msg_preview}")
except Exception:
pass
with self._lock:
self._session.post(url, json=payload, timeout=5)
resp = self._session.post(url, json=payload, timeout=5)
if os.environ.get("EP_DEBUG") == "true":
try:
print(f"[FW_LOG] resp={resp.status_code}")
except Exception:
pass
# Fallback to /v1/logs if /logs is not found
if resp is not None and getattr(resp, "status_code", None) == 404:
alt = f"{base}/v1/logs"
if os.environ.get("EP_DEBUG") == "true":
try:
tags_val = payload.get("tags")
tags_len = len(tags_val) if isinstance(tags_val, list) else 0
print(f"[FW_LOG] RETRY POST {alt} rollout_id={rollout_id} tags={tags_len}")
except Exception:
pass
with self._lock:
resp2 = self._session.post(alt, json=payload, timeout=5)
if os.environ.get("EP_DEBUG") == "true":
try:
print(f"[FW_LOG] retry resp={resp2.status_code}")
except Exception:
pass
except Exception:
# Avoid raising exceptions from logging
self.handleError(record)
Expand Down
16 changes: 12 additions & 4 deletions eval_protocol/log_utils/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def init_external_logging_from_env() -> None:
# Ensure we do not add duplicate handlers if already present
existing_handler_types = {type(h).__name__ for h in root_logger.handlers}

# Fireworks tracing
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL")
if fw_url and "FireworksTracingHttpHandler" not in existing_handler_types:
# Fireworks tracing: prefer if FIREWORKS_API_KEY is present; default base URL if not provided
fw_key = _get_env("FIREWORKS_API_KEY")
# Allow remote validation gateway to act as tracing base when provided
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL") or _get_env("GATEWAY_URL") or "https://tracing.fireworks.ai"
if fw_key and "FireworksTracingHttpHandler" not in existing_handler_types:
fw_handler = FireworksTracingHttpHandler(gateway_base_url=fw_url)
fw_handler.setLevel(logging.INFO)
fw_handler.addFilter(ContextRolloutIdFilter())
Expand All @@ -51,7 +53,13 @@ def init_external_logging_from_env() -> None:
es_url = _get_env("EP_ELASTICSEARCH_URL")
es_api_key = _get_env("EP_ELASTICSEARCH_API_KEY")
es_index = _get_env("EP_ELASTICSEARCH_INDEX")
if es_url and es_api_key and es_index and "ElasticsearchDirectHttpHandler" not in existing_handler_types:
if (
not fw_key
and es_url
and es_api_key
and es_index
and "ElasticsearchDirectHttpHandler" not in existing_handler_types
):
es_config = ElasticsearchConfig(url=es_url, api_key=es_api_key, index_name=es_index)
es_handler = ElasticsearchDirectHttpHandler(elasticsearch_config=es_config)
es_handler.setLevel(logging.INFO)
Expand Down
13 changes: 11 additions & 2 deletions eval_protocol/proxy/proxy_core/redis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import logging
from typing import Set
from typing import Set, cast
import redis

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -40,7 +40,16 @@ def get_insertion_ids(redis_client: redis.Redis, rollout_id: str) -> Set[str]:
Set of insertion_id strings, empty set if none found or on error
"""
try:
insertion_ids = redis_client.smembers(rollout_id)
raw = redis_client.smembers(rollout_id)
# Typing in redis stubs may be Awaitable[Set[Any]] | Set[Any]; at runtime this is a Set[bytes]
raw_ids = cast(Set[object], raw)
# Normalize to set[str]
insertion_ids: Set[str] = set()
for b in raw_ids:
try:
insertion_ids.add(b.decode("utf-8") if isinstance(b, (bytes, bytearray)) else cast(str, b))
except Exception:
continue
logger.debug(f"Found {len(insertion_ids)} expected insertion_ids for rollout {rollout_id}")
return insertion_ids
except Exception as e:
Expand Down
Loading
Loading