Skip to content

Commit bb73ed4

Browse files
authored
Merge branch 'main' into completionparams
2 parents d323263 + fac0152 commit bb73ed4

24 files changed

+1146
-198
lines changed

.github/workflows/elasticsearch-tests.yml renamed to .github/workflows/fireworks-tracing-tests.yml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Elasticsearch Tests
1+
name: Fireworks Tracing Tests
22

33
on:
44
push:
@@ -13,8 +13,8 @@ on:
1313
workflow_dispatch: # Allow manual triggering
1414

1515
jobs:
16-
elasticsearch-tests:
17-
name: Elasticsearch Integration Tests
16+
fireworks-tracing-tests:
17+
name: Fireworks Tracing Integration Tests
1818
runs-on: ubuntu-latest
1919

2020
steps:
@@ -36,14 +36,15 @@ jobs:
3636
- name: Install the project
3737
run: uv sync --locked --all-extras --dev
3838

39-
- name: Run Elasticsearch Tests
39+
- name: Run Fireworks Tracing Tests
4040
env:
4141
FIREWORKS_API_KEY: ${{ secrets.FIREWORKS_API_KEY }}
4242
PYTHONWARNINGS: "ignore::DeprecationWarning,ignore::RuntimeWarning"
4343
run: |
44-
# Run Elasticsearch direct HTTP handler tests
45-
uv run pytest tests/logging/test_elasticsearch_direct_http_handler.py -v --tb=short
44+
# Run RemoteRolloutProcessor End-to-End Test (auto server startup)
45+
uv run pytest tests/remote_server/test_remote_fireworks.py::test_remote_rollout_and_fetch_fireworks \
46+
-v --tb=short
4647
47-
# Run RemoteRolloutProcessor Propagate Status Smoke Test (also uses Elasticsearch)
48+
# Run RemoteRolloutProcessor Propagate Status Test (auto server startup)
4849
uv run pytest tests/remote_server/test_remote_fireworks_propagate_status.py::test_remote_rollout_and_fetch_fireworks_propagate_status \
4950
-v --tb=short

eval_protocol/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
filter_longest_conversation,
3737
)
3838
from .pytest import evaluation_test, SingleTurnRolloutProcessor, RemoteRolloutProcessor, GithubActionRolloutProcessor
39-
from .pytest.remote_rollout_processor import create_elasticsearch_config_from_env
4039
from .pytest.parameterize import DefaultParameterIdGenerator
4140
from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler
4241
from .log_utils.rollout_id_filter import RolloutIdFilter
@@ -90,7 +89,6 @@
9089
warnings.filterwarnings("default", category=DeprecationWarning, module="eval_protocol")
9190

9291
__all__ = [
93-
"create_elasticsearch_config_from_env",
9492
"ElasticsearchConfig",
9593
"ElasticsearchDirectHttpHandler",
9694
"RolloutIdFilter",

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,55 @@ def __init__(
265265
self.base_url = base_url.rstrip("/")
266266
self.timeout = timeout
267267

268+
def search_logs(self, tags: List[str], limit: int = 100, hours_back: int = 24) -> List[Dict[str, Any]]:
269+
"""Fetch logs from Fireworks tracing gateway /logs endpoint.
270+
271+
Returns entries with keys: timestamp, message, severity, tags.
272+
"""
273+
if not tags:
274+
raise ValueError("At least one tag is required to fetch logs")
275+
276+
headers = {"Authorization": f"Bearer {os.environ.get('FIREWORKS_API_KEY')}"}
277+
params: Dict[str, Any] = {"tags": tags, "limit": limit, "hours_back": hours_back, "program": "eval_protocol"}
278+
279+
# Try /logs first, fall back to /v1/logs if not found
280+
urls_to_try = [f"{self.base_url}/logs", f"{self.base_url}/v1/logs"]
281+
data: Dict[str, Any] = {}
282+
last_error: Optional[str] = None
283+
for url in urls_to_try:
284+
try:
285+
response = requests.get(url, params=params, timeout=self.timeout, headers=headers)
286+
if response.status_code == 404:
287+
# Try next variant
288+
last_error = f"404 for {url}"
289+
continue
290+
response.raise_for_status()
291+
data = response.json() or {}
292+
break
293+
except requests.exceptions.RequestException as e:
294+
last_error = str(e)
295+
continue
296+
else:
297+
# All attempts failed
298+
if last_error:
299+
logger.error("Failed to fetch logs from Fireworks (tried %s): %s", urls_to_try, last_error)
300+
return []
301+
302+
entries: List[Dict[str, Any]] = data.get("entries", []) or []
303+
# Normalize minimal shape
304+
results: List[Dict[str, Any]] = []
305+
for e in entries:
306+
results.append(
307+
{
308+
"timestamp": e.get("timestamp"),
309+
"message": e.get("message"),
310+
"severity": e.get("severity", "INFO"),
311+
"tags": e.get("tags", []),
312+
"status": e.get("status"),
313+
}
314+
)
315+
return results
316+
268317
def get_evaluation_rows(
269318
self,
270319
tags: List[str],

eval_protocol/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,16 @@ def parse_args(args=None):
307307
action="store_true",
308308
help="Use env vars for Elasticsearch config (requires ELASTICSEARCH_URL, ELASTICSEARCH_API_KEY, ELASTICSEARCH_INDEX_NAME)",
309309
)
310+
logs_parser.add_argument(
311+
"--use-fireworks",
312+
action="store_true",
313+
help="Force Fireworks tracing backend for logs UI (overrides env auto-detection)",
314+
)
315+
logs_parser.add_argument(
316+
"--use-elasticsearch",
317+
action="store_true",
318+
help="Force Elasticsearch backend for logs UI (overrides env auto-detection)",
319+
)
310320

311321
# Upload command
312322
upload_parser = subparsers.add_parser(

eval_protocol/cli_commands/logs.py

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import sys
66
from pathlib import Path
77

8+
import os
89
from ..utils.logs_server import serve_logs
910

1011

@@ -20,53 +21,33 @@ def logs_command(args):
2021
print("Press Ctrl+C to stop the server")
2122
print("-" * 50)
2223

23-
# Setup Elasticsearch based on flags
24-
elasticsearch_config = None
25-
try:
26-
if getattr(args, "use_env_elasticsearch_config", False):
27-
# Use environment variables for configuration
28-
print("⚙️ Using environment variables for Elasticsearch config")
29-
from eval_protocol.pytest.remote_rollout_processor import (
30-
create_elasticsearch_config_from_env,
31-
)
32-
33-
elasticsearch_config = create_elasticsearch_config_from_env()
34-
# Ensure index exists with correct mapping, mirroring Docker setup path
35-
try:
36-
from eval_protocol.log_utils.elasticsearch_index_manager import (
37-
ElasticsearchIndexManager,
38-
)
39-
40-
index_manager = ElasticsearchIndexManager(
41-
elasticsearch_config.url,
42-
elasticsearch_config.index_name,
43-
elasticsearch_config.api_key,
44-
)
45-
created = index_manager.create_logging_index_mapping()
46-
if created:
47-
print(
48-
f"🧭 Verified Elasticsearch index '{elasticsearch_config.index_name}' mapping (created or already correct)"
49-
)
50-
else:
51-
print(
52-
f"⚠️ Could not verify/create mapping for index '{elasticsearch_config.index_name}'. Searches may behave unexpectedly."
53-
)
54-
except Exception as e:
55-
print(f"⚠️ Failed to ensure index mapping via IndexManager: {e}")
56-
elif not getattr(args, "disable_elasticsearch_setup", False):
57-
# Default behavior: start or connect to local Elasticsearch via Docker helper
58-
from eval_protocol.pytest.elasticsearch_setup import ElasticsearchSetup
24+
# Backend selection: Fireworks first when API key present, unless overridden
25+
use_fireworks = False
26+
if getattr(args, "use_fireworks", False):
27+
use_fireworks = True
28+
elif getattr(args, "use_elasticsearch", False):
29+
use_fireworks = False
30+
else:
31+
use_fireworks = bool(os.environ.get("FIREWORKS_API_KEY"))
5932

60-
print("🧰 Auto-configuring local Elasticsearch (Docker)")
61-
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
62-
else:
63-
print("🚫 Elasticsearch setup disabled; running without Elasticsearch integration")
64-
except Exception as e:
65-
print(f"❌ Failed to configure Elasticsearch: {e}")
66-
return 1
33+
# Setup backend configs
34+
elasticsearch_config = None
35+
# Prefer explicit FW_TRACING_GATEWAY_BASE_URL, then GATEWAY_URL from env (remote validation),
36+
# finally default to public tracing.fireworks.ai
37+
fireworks_base_url = (
38+
os.environ.get("FW_TRACING_GATEWAY_BASE_URL")
39+
or os.environ.get("GATEWAY_URL")
40+
or "https://tracing.fireworks.ai"
41+
)
6742

6843
try:
69-
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
44+
serve_logs(
45+
port=args.port,
46+
elasticsearch_config=elasticsearch_config,
47+
debug=args.debug,
48+
backend="fireworks" if use_fireworks else "elasticsearch",
49+
fireworks_base_url=fireworks_base_url if use_fireworks else None,
50+
)
7051
return 0
7152
except KeyboardInterrupt:
7253
print("\n🛑 Server stopped by user")

eval_protocol/cli_commands/upload.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -267,32 +267,29 @@ def _parse_entry(entry: str, cwd: str) -> tuple[str, str]:
267267
def _resolve_entry_to_qual_and_source(entry: str, cwd: str) -> tuple[str, str]:
268268
target, func = _parse_entry(entry, cwd)
269269

270-
# Check if target looks like a file path
270+
# Determine the file path to load
271271
if "/" in target or "\\" in target or os.path.exists(target):
272-
# It's a file path - convert to absolute and load as module
272+
# It's a file path - convert to absolute
273273
if not os.path.isabs(target):
274274
target = os.path.abspath(os.path.join(cwd, target))
275-
276275
if not target.endswith(".py"):
277276
target = target + ".py"
278-
279277
if not os.path.isfile(target):
280278
raise ValueError(f"File not found: {target}")
281-
282-
# Import module from file path
283-
spec = importlib.util.spec_from_file_location(Path(target).stem, target)
284-
if not spec or not spec.loader:
285-
raise ValueError(f"Unable to load module from path: {target}")
286-
module = importlib.util.module_from_spec(spec)
287-
sys.modules[spec.name] = module
288-
spec.loader.exec_module(module) # type: ignore[attr-defined]
289-
module_name = spec.name
290279
source_file_path = target
291280
else:
292-
# Treat as module path (e.g., "my_package.my_module")
293-
module_name = target
294-
module = importlib.import_module(module_name)
295-
source_file_path = getattr(module, "__file__", "") or ""
281+
# Treat dotted name as a file path
282+
dotted_as_path = target.replace(".", "/") + ".py"
283+
source_file_path = os.path.join(cwd, dotted_as_path)
284+
285+
# Load the module from the file path
286+
spec = importlib.util.spec_from_file_location(Path(source_file_path).stem, source_file_path)
287+
if not spec or not spec.loader:
288+
raise ValueError(f"Unable to load module from path: {source_file_path}")
289+
module = importlib.util.module_from_spec(spec)
290+
sys.modules[spec.name] = module
291+
spec.loader.exec_module(module) # type: ignore[attr-defined]
292+
module_name = spec.name
296293

297294
if not hasattr(module, func):
298295
raise ValueError(f"Function '{func}' not found in module '{module_name}'")
@@ -591,8 +588,7 @@ def upload_command(args: argparse.Namespace) -> int:
591588

592589
print(f"\nUploading evaluator '{evaluator_id}' for {qualname.split('.')[-1]}...")
593590
try:
594-
# Always treat as a single evaluator (single-metric) even if folder has helper modules
595-
test_dir = os.path.dirname(source_file_path) if source_file_path else root
591+
test_dir = root
596592
metric_name = os.path.basename(test_dir) or "metric"
597593
result = create_evaluation(
598594
evaluator_id=evaluator_id,

eval_protocol/log_utils/elasticsearch_direct_http_handler.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ def emit(self, record: logging.LogRecord) -> None:
6060
if status_info:
6161
data.update(status_info)
6262

63+
# Optional correlation enrichment
64+
experiment_id = getattr(record, "experiment_id", None)
65+
if experiment_id is not None:
66+
data["experiment_id"] = experiment_id
67+
run_id = getattr(record, "run_id", None)
68+
if run_id is not None:
69+
data["run_id"] = run_id
70+
rollout_ids = getattr(record, "rollout_ids", None)
71+
if rollout_ids is not None:
72+
data["rollout_ids"] = rollout_ids
73+
6374
# Schedule the HTTP request to run asynchronously
6475
self._schedule_async_send(data, record)
6576
except Exception as e:

0 commit comments

Comments
 (0)