Skip to content

Commit 7f2c9b3

Browse files
committed
client logging sink
1 parent b120611 commit 7f2c9b3

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

eval_protocol/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler
3636
from .log_utils.rollout_id_filter import RolloutIdFilter
3737
from .log_utils.util import setup_rollout_logging_for_elasticsearch_handler
38+
from .log_utils.fireworks_tracing_http_handler import FireworksTracingHttpHandler
39+
3840

3941
from .types.remote_rollout_processor import (
4042
InitRequest,
@@ -89,6 +91,7 @@
8991
"BraintrustAdapter",
9092
"create_braintrust_adapter",
9193
"LangSmithAdapter",
94+
"FireworksTracingHttpHandler",
9295
# Core interfaces
9396
"Message",
9497
"MetricResult",
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import logging
2+
import os
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, Any, Dict, List
6+
7+
import requests
8+
9+
10+
class FireworksTracingHttpHandler(logging.Handler):
11+
"""Logging handler that posts structured logs to tracing.fireworks gateway /logs endpoint."""
12+
13+
def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None:
14+
super().__init__()
15+
self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL")
16+
self.rollout_id_env = rollout_id_env
17+
self._session = requests.Session()
18+
self._lock = threading.Lock()
19+
20+
def emit(self, record: logging.LogRecord) -> None:
21+
try:
22+
if not self.gateway_base_url:
23+
return
24+
rollout_id = self._get_rollout_id(record)
25+
if not rollout_id:
26+
return
27+
payload = self._build_payload(record, rollout_id)
28+
url = f"{self.gateway_base_url.rstrip('/')}/logs"
29+
with self._lock:
30+
self._session.post(url, json=payload, timeout=5)
31+
except Exception:
32+
# Avoid raising exceptions from logging
33+
self.handleError(record)
34+
35+
def _get_rollout_id(self, record: logging.LogRecord) -> Optional[str]:
36+
if hasattr(record, "rollout_id") and record.rollout_id is not None: # type: ignore
37+
return str(record.rollout_id) # type: ignore
38+
return os.getenv(self.rollout_id_env)
39+
40+
def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str, Any]:
41+
timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
42+
message = record.getMessage()
43+
tags: List[str] = [f"rollout_id:{rollout_id}"]
44+
# Optional additional tags
45+
if hasattr(record, "experiment_id") and record.experiment_id:
46+
tags.append(f"experiment_id:{record.experiment_id}") # type: ignore
47+
if hasattr(record, "run_id") and record.run_id:
48+
tags.append(f"run_id:{record.run_id}") # type: ignore
49+
program = getattr(record, "program", None) or "eval_protocol"
50+
status = getattr(record, "status", None)
51+
return {
52+
"program": program,
53+
"status": status if isinstance(status, str) else None,
54+
"message": message,
55+
"tags": tags,
56+
"metadata": getattr(record, "metadata", None),
57+
"extras": {
58+
"logger_name": record.name,
59+
"level": record.levelname,
60+
"timestamp": timestamp,
61+
},
62+
}

scripts/validate_dev_tracing.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import os
2+
import time
3+
import logging
4+
import requests
5+
6+
from eval_protocol import FireworksTracingHttpHandler
7+
8+
9+
def main():
10+
gateway = os.getenv("FW_TRACING_GATEWAY_BASE_URL")
11+
if not gateway:
12+
# default to deployed dev gateway
13+
gateway = "https://metadata-gateway-dev-644257448872.us-central1.run.app"
14+
rollout_id = os.getenv("EP_ROLLOUT_ID", f"sdk-dev-{int(time.time())}")
15+
16+
root = logging.getLogger()
17+
root.setLevel(logging.INFO)
18+
root.addHandler(FireworksTracingHttpHandler(gateway_base_url=gateway))
19+
20+
logger = logging.getLogger("eval_protocol.sdk.validate")
21+
22+
logger.info("SDK sending structured log to dev gateway", extra={
23+
"rollout_id": rollout_id,
24+
"program": "eval_protocol",
25+
"status": "completed",
26+
"experiment_id": "dev-exp",
27+
"run_id": "dev-run",
28+
"metadata": {"source": "sdk-validate"},
29+
})
30+
31+
# Poll fetch with retries for indexing
32+
params = {
33+
"tags": [f"rollout_id:{rollout_id}"],
34+
"program": "eval_protocol",
35+
"limit": 10,
36+
"hours_back": 1,
37+
}
38+
total = 0
39+
for _ in range(20):
40+
r = requests.get(f"{gateway}/logs", params=params, timeout=30)
41+
r.raise_for_status()
42+
data = r.json()
43+
total = int(data.get("total_entries") or 0)
44+
if total > 0:
45+
print("Fetched entries:", total)
46+
for e in data.get("entries", []):
47+
print({k: e.get(k) for k in ["timestamp", "severity", "program", "status", "message", "tags"]})
48+
break
49+
time.sleep(3)
50+
if total == 0:
51+
print("Fetched entries: 0 (after retries)")
52+
53+
54+
if __name__ == "__main__":
55+
main()

0 commit comments

Comments
 (0)