Skip to content

Commit 35aef2b

Browse files
committed
use fireworks tracing and logging directly
1 parent 44ffe72 commit 35aef2b

File tree

7 files changed

+326
-45
lines changed

7 files changed

+326
-45
lines changed

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,40 @@ def __init__(
265265
self.base_url = base_url.rstrip("/")
266266
self.timeout = timeout
267267

268+
def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
269+
"""Fetch logs from Fireworks tracing gateway /logs endpoint.
270+
271+
Returns entries with keys: timestamp, message, severity, tags.
272+
"""
273+
if not tags:
274+
raise ValueError("At least one tag is required to fetch logs")
275+
276+
url = f"{self.base_url}/logs"
277+
headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"}
278+
params: Dict[str, Any] = {"tags": tags, "limit": limit, "hours_back": hours_back, "program": "eval_protocol"}
279+
280+
try:
281+
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
282+
response.raise_for_status()
283+
data = response.json() or {}
284+
except requests.exceptions.RequestException as e:
285+
logger.error("Failed to fetch logs from Fireworks /logs: %s", str(e))
286+
return []
287+
288+
entries: List[Dict[str, Any]] = data.get("entries", []) or []
289+
# Normalize minimal shape
290+
results: List[Dict[str, Any]] = []
291+
for e in entries:
292+
results.append(
293+
{
294+
"timestamp": e.get("timestamp"),
295+
"message": e.get("message"),
296+
"severity": e.get("severity", "INFO"),
297+
"tags": e.get("tags", []),
298+
}
299+
)
300+
return results
301+
268302
def get_evaluation_rows(
269303
self,
270304
tags: List[str],

eval_protocol/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,16 @@ def parse_args(args=None):
307307
action="store_true",
308308
help="Use env vars for Elasticsearch config (requires ELASTICSEARCH_URL, ELASTICSEARCH_API_KEY, ELASTICSEARCH_INDEX_NAME)",
309309
)
310+
logs_parser.add_argument(
311+
"--use-fireworks",
312+
action="store_true",
313+
help="Force Fireworks tracing backend for logs UI (overrides env auto-detection)",
314+
)
315+
logs_parser.add_argument(
316+
"--use-elasticsearch",
317+
action="store_true",
318+
help="Force Elasticsearch backend for logs UI (overrides env auto-detection)",
319+
)
310320

311321
# Upload command
312322
upload_parser = subparsers.add_parser(

eval_protocol/cli_commands/logs.py

Lines changed: 53 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sys
66
from pathlib import Path
77

8+
import os
89
from ..utils.logs_server import serve_logs
910

1011

@@ -20,53 +21,70 @@ def logs_command(args):
2021
print("Press Ctrl+C to stop the server")
2122
print("-" * 50)
2223

23-
# Setup Elasticsearch based on flags
24+
# Backend selection: Fireworks first when API key present, unless overridden
25+
use_fireworks = False
26+
if getattr(args, "use_fireworks", False):
27+
use_fireworks = True
28+
elif getattr(args, "use_elasticsearch", False):
29+
use_fireworks = False
30+
else:
31+
use_fireworks = bool(os.environ.get("FIREWORKS_API_KEY"))
32+
33+
# Setup backend configs
2434
elasticsearch_config = None
35+
fireworks_base_url = os.environ.get("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai"
2536
try:
26-
if getattr(args, "use_env_elasticsearch_config", False):
27-
# Use environment variables for configuration
28-
print("⚙️ Using environment variables for Elasticsearch config")
29-
from eval_protocol.pytest.remote_rollout_processor import (
30-
create_elasticsearch_config_from_env,
31-
)
32-
33-
elasticsearch_config = create_elasticsearch_config_from_env()
34-
# Ensure index exists with correct mapping, mirroring Docker setup path
35-
try:
36-
from eval_protocol.log_utils.elasticsearch_index_manager import (
37-
ElasticsearchIndexManager,
37+
if not use_fireworks:
38+
if getattr(args, "use_env_elasticsearch_config", False):
39+
# Use environment variables for configuration
40+
print("⚙️ Using environment variables for Elasticsearch config")
41+
from eval_protocol.pytest.remote_rollout_processor import (
42+
create_elasticsearch_config_from_env,
3843
)
3944

40-
index_manager = ElasticsearchIndexManager(
41-
elasticsearch_config.url,
42-
elasticsearch_config.index_name,
43-
elasticsearch_config.api_key,
44-
)
45-
created = index_manager.create_logging_index_mapping()
46-
if created:
47-
print(
48-
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
45+
elasticsearch_config = create_elasticsearch_config_from_env()
46+
# Ensure index exists with correct mapping, mirroring Docker setup path
47+
try:
48+
from eval_protocol.log_utils.elasticsearch_index_manager import (
49+
ElasticsearchIndexManager,
4950
)
50-
else:
51-
print(
52-
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
51+
52+
index_manager = ElasticsearchIndexManager(
53+
elasticsearch_config.url,
54+
elasticsearch_config.index_name,
55+
elasticsearch_config.api_key,
5356
)
54-
except Exception as e:
55-
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
56-
elif not getattr(args, "disable_elasticsearch_setup", False):
57-
# Default behavior: start or connect to local Elasticsearch via Docker helper
58-
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
57+
created = index_manager.create_logging_index_mapping()
58+
if created:
59+
print(
60+
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
61+
)
62+
else:
63+
print(
64+
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
65+
)
66+
except Exception as e:
67+
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
68+
elif not getattr(args, "disable_elasticsearch_setup", False):
69+
# Default behavior: start or connect to local Elasticsearch via Docker helper
70+
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
5971

60-
print("🧰 Auto-configuring local Elasticsearch (Docker)")
61-
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
62-
else:
63-
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
72+
print("🧰 Auto-configuring local Elasticsearch (Docker)")
73+
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
74+
else:
75+
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
6476
except Exception as e:
6577
print(f"❌ Failed to configure Elasticsearch: {e}")
6678
return 1
6779

6880
try:
69-
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
81+
serve_logs(
82+
port=args.port,
83+
elasticsearch_config=elasticsearch_config,
84+
debug=args.debug,
85+
backend="fireworks" if use_fireworks else "elasticsearch",
86+
fireworks_base_url=fireworks_base_url if use_fireworks else None,
87+
)
7088
return 0
7189
except KeyboardInterrupt:
7290
print("\n🛑 Server stopped by user")

eval_protocol/log_utils/fireworks_tracing_http_handler.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,19 @@ class FireworksTracingHttpHandler(logging.Handler):
1212

1313
def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None:
1414
super().__init__()
15-
self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL")
15+
self.gateway_base_url = (
16+
gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai"
17+
)
1618
self.rollout_id_env = rollout_id_env
1719
self._session = requests.Session()
1820
self._lock = threading.Lock()
21+
# Include Authorization header if FIREWORKS_API_KEY is available
22+
api_key = os.environ.get("FIREWORKS_API_KEY")
23+
if api_key:
24+
try:
25+
self._session.headers.update({"Authorization": f"Bearer {api_key}"})
26+
except Exception:
27+
pass
1928

2029
def emit(self, record: logging.LogRecord) -> None:
2130
try:

eval_protocol/log_utils/init.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ def init_external_logging_from_env() -> None:
3939
# Ensure we do not add duplicate handlers if already present
4040
existing_handler_types = {type(h).__name__ for h in root_logger.handlers}
4141

42-
# Fireworks tracing
43-
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL")
44-
if fw_url and "FireworksTracingHttpHandler" not in existing_handler_types:
42+
# Fireworks tracing: prefer if FIREWORKS_API_KEY is present; default base URL if not provided
43+
fw_key = _get_env("FIREWORKS_API_KEY")
44+
fw_url = _get_env("FW_TRACING_GATEWAY_BASE_URL") or "https://tracing.fireworks.ai"
45+
if fw_key and "FireworksTracingHttpHandler" not in existing_handler_types:
4546
fw_handler = FireworksTracingHttpHandler(gateway_base_url=fw_url)
4647
fw_handler.setLevel(logging.INFO)
4748
fw_handler.addFilter(ContextRolloutIdFilter())
@@ -51,7 +52,13 @@ def init_external_logging_from_env() -> None:
5152
es_url = _get_env("EP_ELASTICSEARCH_URL")
5253
es_api_key = _get_env("EP_ELASTICSEARCH_API_KEY")
5354
es_index = _get_env("EP_ELASTICSEARCH_INDEX")
54-
if es_url and es_api_key and es_index and "ElasticsearchDirectHttpHandler" not in existing_handler_types:
55+
if (
56+
not fw_key
57+
and es_url
58+
and es_api_key
59+
and es_index
60+
and "ElasticsearchDirectHttpHandler" not in existing_handler_types
61+
):
5562
es_config = ElasticsearchConfig(url=es_url, api_key=es_api_key, index_name=es_index)
5663
es_handler = ElasticsearchDirectHttpHandler(elasticsearch_config=es_config)
5764
es_handler.setLevel(logging.INFO)

eval_protocol/utils/logs_server.py

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from contextlib import asynccontextmanager
99
from pathlib import Path
1010
from queue import Queue
11-
from typing import TYPE_CHECKING, Any, Dict, List, Optional
11+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Literal
1212

1313
import psutil
1414
import uvicorn
@@ -327,6 +327,8 @@ def __init__(
327327
port: Optional[int] = 8000,
328328
index_file: str = "index.html",
329329
elasticsearch_config: Optional[ElasticsearchConfig] = None,
330+
backend: Literal["fireworks", "elasticsearch"] = "elasticsearch",
331+
fireworks_base_url: Optional[str] = None,
330332
debug: bool = False,
331333
):
332334
# Enable debug mode if requested
@@ -336,6 +338,10 @@ def __init__(
336338
# Initialize WebSocket manager
337339
self.websocket_manager = WebSocketManager()
338340

341+
# Backend selection
342+
self.backend: Literal["fireworks", "elasticsearch"] = backend
343+
self.fireworks_base_url = fireworks_base_url
344+
339345
# Initialize Elasticsearch client if config is provided
340346
self.elasticsearch_client: Optional[ElasticsearchClient] = None
341347
if elasticsearch_config:
@@ -414,6 +420,8 @@ async def status():
414420
# Expose an empty list to satisfy consumers and type checker
415421
"watch_paths": [],
416422
"elasticsearch_enabled": self.elasticsearch_client is not None,
423+
"backend": self.backend,
424+
"fireworks_enabled": self.backend == "fireworks",
417425
}
418426

419427
@self.app.get("/api/logs/{rollout_id}", response_model=LogsResponse, response_model_exclude_none=True)
@@ -422,7 +430,47 @@ async def get_logs(
422430
level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"),
423431
limit: int = Query(100, description="Maximum number of log entries to return"),
424432
) -> LogsResponse:
425-
"""Get logs for a specific rollout ID from Elasticsearch."""
433+
"""Get logs for a specific rollout ID from the configured backend."""
434+
# Fireworks backend
435+
if self.backend == "fireworks":
436+
try:
437+
from eval_protocol.adapters.fireworks_tracing import FireworksTracingAdapter
438+
439+
base_url = self.fireworks_base_url or "https://tracing.fireworks.ai"
440+
adapter = FireworksTracingAdapter(base_url=base_url)
441+
# Fetch lightweight log entries filtered by rollout_id tag
442+
tags = [f"rollout_id:{rollout_id}"]
443+
entries = adapter.search_logs(tags=tags, limit=limit)
444+
# Map to LogEntry responses
445+
log_entries: List[LogEntry] = []
446+
for e in entries:
447+
ts = e.get("timestamp") or datetime.utcnow().isoformat() + "Z"
448+
msg = e.get("message") or "trace"
449+
sev = e.get("severity") or "INFO"
450+
entry = LogEntry(
451+
**{
452+
"@timestamp": ts,
453+
"level": sev,
454+
"message": str(msg),
455+
"logger_name": "fireworks",
456+
"rollout_id": rollout_id,
457+
}
458+
)
459+
log_entries.append(entry)
460+
461+
return LogsResponse(
462+
logs=log_entries,
463+
total=len(log_entries),
464+
rollout_id=rollout_id,
465+
filtered_by_level=level,
466+
)
467+
except HTTPException:
468+
raise
469+
except Exception as e:
470+
logger.error(f"Error retrieving Fireworks logs for rollout {rollout_id}: {e}")
471+
raise HTTPException(status_code=500, detail=f"Failed to retrieve Fireworks logs: {str(e)}")
472+
473+
# Elasticsearch backend
426474
if not self.elasticsearch_client:
427475
raise HTTPException(status_code=503, detail="Elasticsearch is not configured for this logs server")
428476

@@ -574,6 +622,8 @@ def create_app(
574622
port: int = 8000,
575623
build_dir: Optional[str] = None,
576624
elasticsearch_config: Optional[ElasticsearchConfig] = None,
625+
backend: Literal["fireworks", "elasticsearch"] = "elasticsearch",
626+
fireworks_base_url: Optional[str] = None,
577627
debug: bool = False,
578628
) -> FastAPI:
579629
"""
@@ -597,20 +647,36 @@ def create_app(
597647
)
598648

599649
server = LogsServer(
600-
host=host, port=port, build_dir=build_dir, elasticsearch_config=elasticsearch_config, debug=debug
650+
host=host,
651+
port=port,
652+
build_dir=build_dir,
653+
elasticsearch_config=elasticsearch_config,
654+
backend=backend,
655+
fireworks_base_url=fireworks_base_url,
656+
debug=debug,
601657
)
602658
server.start_loops()
603659
return server.app
604660

605661

606662
# For backward compatibility and direct usage
607663
def serve_logs(
608-
port: Optional[int] = None, elasticsearch_config: Optional[ElasticsearchConfig] = None, debug: bool = False
664+
port: Optional[int] = None,
665+
elasticsearch_config: Optional[ElasticsearchConfig] = None,
666+
debug: bool = False,
667+
backend: Literal["fireworks", "elasticsearch"] = "elasticsearch",
668+
fireworks_base_url: Optional[str] = None,
609669
):
610670
"""
611671
Convenience function to create and run a LogsServer.
612672
"""
613-
server = LogsServer(port=port, elasticsearch_config=elasticsearch_config, debug=debug)
673+
server = LogsServer(
674+
port=port,
675+
elasticsearch_config=elasticsearch_config,
676+
debug=debug,
677+
backend=backend,
678+
fireworks_base_url=fireworks_base_url,
679+
)
614680
server.run()
615681

616682

0 commit comments

Comments
 (0)