Skip to content

Commit 01ef0b0

Browse files
authored
use fireworks tracing and logging directly (#284)
* use fireworks tracing and logging directly * fix log server error * fix logging sink
1 parent 43e5e08 commit 01ef0b0

File tree

9 files changed

+582
-49
lines changed

9 files changed

+582
-49
lines changed

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,54 @@ def __init__(
265265
self.base_url = base_url.rstrip("/")
266266
self.timeout = timeout
267267

268+
def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
269+
"""Fetch logs from Fireworks tracing gateway /logs endpoint.
270+
271+
Returns entries with keys: timestamp, message, severity, tags.
272+
"""
273+
if not tags:
274+
raise ValueError("At least one tag is required to fetch logs")
275+
276+
headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"}
277+
params: Dict[str, Any] = {"tags": tags, "limit": limit, "hours_back": hours_back, "program": "eval_protocol"}
278+
279+
# Try /logs first, fall back to /v1/logs if not found
280+
urls_to_try = [f"{self.base_url}/logs", f"{self.base_url}/v1/logs"]
281+
data: Dict[str, Any] = {}
282+
last_error: Optional[str] = None
283+
for url in urls_to_try:
284+
try:
285+
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
286+
if response.status_code == 404:
287+
# Try next variant
288+
last_error = f"404 for {url}"
289+
continue
290+
response.raise_for_status()
291+
data = response.json() or {}
292+
break
293+
except requests.exceptions.RequestException as e:
294+
last_error = str(e)
295+
continue
296+
else:
297+
# All attempts failed
298+
if last_error:
299+
logger.error("Failed to fetch logs from Fireworks (tried %s): %s", urls_to_try, last_error)
300+
return []
301+
302+
entries: List[Dict[str, Any]] = data.get("entries", []) or []
303+
# Normalize minimal shape
304+
results: List[Dict[str, Any]] = []
305+
for e in entries:
306+
results.append(
307+
{
308+
"timestamp": e.get("timestamp"),
309+
"message": e.get("message"),
310+
"severity": e.get("severity", "INFO"),
311+
"tags": e.get("tags", []),
312+
}
313+
)
314+
return results
315+
268316
def get_evaluation_rows(
269317
self,
270318
tags: List[str],

eval_protocol/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,16 @@ def parse_args(args=None):
307307
action="store_true",
308308
help="Use env vars for Elasticsearch config (requires ELASTICSEARCH_URL, ELASTICSEARCH_API_KEY, ELASTICSEARCH_INDEX_NAME)",
309309
)
310+
logs_parser.add_argument(
311+
"--use-fireworks",
312+
action="store_true",
313+
help="Force Fireworks tracing backend for logs UI (overrides env auto-detection)",
314+
)
315+
logs_parser.add_argument(
316+
"--use-elasticsearch",
317+
action="store_true",
318+
help="Force Elasticsearch backend for logs UI (overrides env auto-detection)",
319+
)
310320

311321
# Upload command
312322
upload_parser = subparsers.add_parser(

eval_protocol/cli_commands/logs.py

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sys
66
from pathlib import Path
77

8+
import os
89
from ..utils.logs_server import serve_logs
910

1011

@@ -20,53 +21,76 @@ def logs_command(args):
2021
print("Press Ctrl+C to stop the server")
2122
print("-" * 50)
2223

23-
# Setup Elasticsearch based on flags
24+
# Backend selection: Fireworks first when API key present, unless overridden
25+
use_fireworks = False
26+
if getattr(args, "use_fireworks", False):
27+
use_fireworks = True
28+
elif getattr(args, "use_elasticsearch", False):
29+
use_fireworks = False
30+
else:
31+
use_fireworks = bool(os.environ.get("FIREWORKS_API_KEY"))
32+
33+
# Setup backend configs
2434
elasticsearch_config = None
35+
# Prefer explicit FW_TRACING_GATEWAY_BASE_URL, then GATEWAY_URL from env (remote validation),
36+
# finally default to public tracing.fireworks.ai
37+
fireworks_base_url = (
38+
os.environ.get("FW_TRACING_GATEWAY_BASE_URL")
39+
or os.environ.get("GATEWAY_URL")
40+
or "https://tracing.fireworks.ai"
41+
)
2542
try:
26-
if getattr(args, "use_env_elasticsearch_config", False):
27-
# Use environment variables for configuration
28-
print("⚙️ Using environment variables for Elasticsearch config")
29-
from eval_protocol.pytest.remote_rollout_processor import (
30-
create_elasticsearch_config_from_env,
31-
)
32-
33-
elasticsearch_config = create_elasticsearch_config_from_env()
34-
# Ensure index exists with correct mapping, mirroring Docker setup path
35-
try:
36-
from eval_protocol.log_utils.elasticsearch_index_manager import (
37-
ElasticsearchIndexManager,
43+
if not use_fireworks:
44+
if getattr(args, "use_env_elasticsearch_config", False):
45+
# Use environment variables for configuration
46+
print("⚙️ Using environment variables for Elasticsearch config")
47+
from eval_protocol.pytest.remote_rollout_processor import (
48+
create_elasticsearch_config_from_env,
3849
)
3950

40-
index_manager = ElasticsearchIndexManager(
41-
elasticsearch_config.url,
42-
elasticsearch_config.index_name,
43-
elasticsearch_config.api_key,
44-
)
45-
created = index_manager.create_logging_index_mapping()
46-
if created:
47-
print(
48-
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
51+
elasticsearch_config = create_elasticsearch_config_from_env()
52+
# Ensure index exists with correct mapping, mirroring Docker setup path
53+
try:
54+
from eval_protocol.log_utils.elasticsearch_index_manager import (
55+
ElasticsearchIndexManager,
4956
)
50-
else:
51-
print(
52-
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
57+
58+
index_manager = ElasticsearchIndexManager(
59+
elasticsearch_config.url,
60+
elasticsearch_config.index_name,
61+
elasticsearch_config.api_key,
5362
)
54-
except Exception as e:
55-
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
56-
elif not getattr(args, "disable_elasticsearch_setup", False):
57-
# Default behavior: start or connect to local Elasticsearch via Docker helper
58-
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
63+
created = index_manager.create_logging_index_mapping()
64+
if created:
65+
print(
66+
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
67+
)
68+
else:
69+
print(
70+
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
71+
)
72+
except Exception as e:
73+
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
74+
elif not getattr(args, "disable_elasticsearch_setup", False):
75+
# Default behavior: start or connect to local Elasticsearch via Docker helper
76+
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
5977

60-
print("🧰 Auto-configuring local Elasticsearch (Docker)")
61-
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
62-
else:
63-
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
78+
print("🧰 Auto-configuring local Elasticsearch (Docker)")
79+
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
80+
else:
81+
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
6482
except Exception as e:
6583
print(f"❌ Failed to configure Elasticsearch: {e}")
6684
return 1
6785

6886
try:
69-
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
87+
serve_logs(
88+
port=args.port,
89+
elasticsearch_config=elasticsearch_config,
90+
debug=args.debug,
91+
backend="fireworks" if use_fireworks else "elasticsearch",
92+
fireworks_base_url=fireworks_base_url if use_fireworks else None,
93+
)
7094
return 0
7195
except KeyboardInterrupt:
7296
print("\n🛑 Server stopped by user")

eval_protocol/log_utils/fireworks_tracing_http_handler.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,19 @@ class FireworksTracingHttpHandler(logging.Handler):
1212

1313
def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None:
1414
super().__init__()
15-
self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL")
15+
self.gateway_base_url = (
16+
gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai"
17+
)
1618
self.rollout_id_env = rollout_id_env
1719
self._session = requests.Session()
1820
self._lock = threading.Lock()
21+
# Include Authorization header if FIREWORKS_API_KEY is available
22+
api_key = os.environ.get("FIREWORKS_API_KEY")
23+
if api_key:
24+
try:
25+
self._session.headers.update({"Authorization": f"Bearer {api_key}"})
26+
except Exception:
27+
pass
1928

2029
def emit(self, record: logging.LogRecord) -> None:
2130
try:
@@ -25,9 +34,42 @@ def emit(self, record: logging.LogRecord) -> None:
2534
if not rollout_id:
2635
return
2736
payload = self._build_payload(record, rollout_id)
28-
url = f"{self.gateway_base_url.rstrip('/')}/logs"
37+
base = self.gateway_base_url.rstrip("/")
38+
url = f"{base}/logs"
39+
# Optional debug prints to aid local diagnostics
40+
if os.environ.get("EP_DEBUG") == "true":
41+
try:
42+
tags_val = payload.get("tags")
43+
tags_len = len(tags_val) if isinstance(tags_val, list) else 0
44+
msg_val = payload.get("message")
45+
msg_preview = msg_val[:80] if isinstance(msg_val, str) else msg_val
46+
print(f"[FW_LOG] POST {url} rollout_id={rollout_id} tags={tags_len} msg={msg_preview}")
47+
except Exception:
48+
pass
2949
with self._lock:
30-
self._session.post(url, json=payload, timeout=5)
50+
resp = self._session.post(url, json=payload, timeout=5)
51+
if os.environ.get("EP_DEBUG") == "true":
52+
try:
53+
print(f"[FW_LOG] resp={resp.status_code}")
54+
except Exception:
55+
pass
56+
# Fallback to /v1/logs if /logs is not found
57+
if resp is not None and getattr(resp, "status_code", None) == 404:
58+
alt = f"{base}/v1/logs"
59+
if os.environ.get("EP_DEBUG") == "true":
60+
try:
61+
tags_val = payload.get("tags")
62+
tags_len = len(tags_val) if isinstance(tags_val, list) else 0
63+
print(f"[FW_LOG] RETRY POST {alt} rollout_id={rollout_id} tags={tags_len}")
64+
except Exception:
65+
pass
66+
with self._lock:
67+
resp2 = self._session.post(alt, json=payload, timeout=5)
68+
if os.environ.get("EP_DEBUG") == "true":
69+
try:
70+
print(f"[FW_LOG] retry resp={resp2.status_code}")
71+
except Exception:
72+
pass
3173
except Exception:
3274
# Avoid raising exceptions from logging
3375
self.handleError(record)

eval_protocol/log_utils/init.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ def init_external_logging_from_env() -> None:
3939
# Ensure we do not add duplicate handlers if already present
4040
existing_handler_types = {type(h).__name__ for h in root_logger.handlers}
4141

42-
# Fireworks tracing
43-
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL")
44-
if fw_url and "FireworksTracingHttpHandler" not in existing_handler_types:
42+
# Fireworks tracing: prefer if FIREWORKS_API_KEY is present; default base URL if not provided
43+
fw_key = _get_env("FIREWORKS_API_KEY")
44+
# Allow remote validation gateway to act as tracing base when provided
45+
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL") or _get_env("GATEWAY_URL") or "https://tracing.fireworks.ai"
46+
if fw_key and "FireworksTracingHttpHandler" not in existing_handler_types:
4547
fw_handler = FireworksTracingHttpHandler(gateway_base_url=fw_url)
4648
fw_handler.setLevel(logging.INFO)
4749
fw_handler.addFilter(ContextRolloutIdFilter())
@@ -51,7 +53,13 @@ def init_external_logging_from_env() -> None:
5153
es_url = _get_env("EP_ELASTICSEARCH_URL")
5254
es_api_key = _get_env("EP_ELASTICSEARCH_API_KEY")
5355
es_index = _get_env("EP_ELASTICSEARCH_INDEX")
54-
if es_url and es_api_key and es_index and "ElasticsearchDirectHttpHandler" not in existing_handler_types:
56+
if (
57+
not fw_key
58+
and es_url
59+
and es_api_key
60+
and es_index
61+
and "ElasticsearchDirectHttpHandler" not in existing_handler_types
62+
):
5563
es_config = ElasticsearchConfig(url=es_url, api_key=es_api_key, index_name=es_index)
5664
es_handler = ElasticsearchDirectHttpHandler(elasticsearch_config=es_config)
5765
es_handler.setLevel(logging.INFO)

eval_protocol/proxy/proxy_core/redis_utils.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
import logging
6-
from typing import Set
6+
from typing import Set, cast
77
import redis
88

99
logger = logging.getLogger(__name__)
@@ -40,7 +40,16 @@ def get_insertion_ids(redis_client: redis.Redis, rollout_id: str) -> Set[str]:
4040
Set of insertion_id strings, empty set if none found or on error
4141
"""
4242
try:
43-
insertion_ids = redis_client.smembers(rollout_id)
43+
raw = redis_client.smembers(rollout_id)
44+
# Typing in redis stubs may be Awaitable[Set[Any]] | Set[Any]; at runtime this is a Set[bytes]
45+
raw_ids = cast(Set[object], raw)
46+
# Normalize to set[str]
47+
insertion_ids: Set[str] = set()
48+
for b in raw_ids:
49+
try:
50+
insertion_ids.add(b.decode("utf-8") if isinstance(b, (bytes, bytearray)) else cast(str, b))
51+
except Exception:
52+
continue
4453
logger.debug(f"Found {len(insertion_ids)} expected insertion_ids for rollout {rollout_id}")
4554
return insertion_ids
4655
except Exception as e:

0 commit comments

Comments
 (0)