Skip to content

Commit a89c9d9

Browse files
authored
client logging sink (#263)
* client logging sink * fix types
1 parent e066feb commit a89c9d9

File tree

4 files changed

+153
-1
lines changed

4 files changed

+153
-1
lines changed

eval_protocol/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from .log_utils.elasticsearch_direct_http_handler import ElasticsearchDirectHttpHandler
3636
from .log_utils.rollout_id_filter import RolloutIdFilter
3737
from .log_utils.util import setup_rollout_logging_for_elasticsearch_handler
38+
from .log_utils.fireworks_tracing_http_handler import FireworksTracingHttpHandler
39+
3840

3941
from .types.remote_rollout_processor import (
4042
InitRequest,
@@ -95,6 +97,7 @@
9597
"BraintrustAdapter",
9698
"create_braintrust_adapter",
9799
"LangSmithAdapter",
100+
"FireworksTracingHttpHandler",
98101
# Core interfaces
99102
"Message",
100103
"MetricResult",

eval_protocol/adapters/openai_responses.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ def _create_messages(self, input_items: SyncCursorPage[ResponseItem]) -> Iterabl
169169
raise NotImplementedError(f"Unsupported content type: {content_item.type}")
170170
elif item.type == "function_call_output":
171171
# Collect tool call outputs to add before assistant message
172-
tool_call_outputs.append(Message(role="tool", content=item.output, tool_call_id=item.call_id))
172+
tool_call_outputs.append(
173+
Message(role="tool", content=self._coerce_tool_output(item.output), tool_call_id=item.call_id)
174+
)
173175
elif item.type == "function_call":
174176
tool_call = ChatCompletionMessageToolCall(
175177
id=item.call_id, type="function", function=Function(name=item.name, arguments=item.arguments)
@@ -186,3 +188,29 @@ def _create_messages(self, input_items: SyncCursorPage[ResponseItem]) -> Iterabl
186188
messages.append(Message(role="assistant", tool_calls=current_tool_calls))
187189

188190
return reversed(messages)
191+
192+
def _coerce_tool_output(self, output: Any) -> str:
193+
"""Coerce OpenAI Responses tool output into a string for Message.content.
194+
195+
The Responses API may return structured content lists. For our purposes,
196+
we stringify non-string outputs to satisfy the Message.content type.
197+
"""
198+
if isinstance(output, str):
199+
return output
200+
try:
201+
# Attempt to join list of objects with any 'text' fields
202+
if isinstance(output, list):
203+
parts: list[str] = []
204+
for part in output:
205+
text = None
206+
if isinstance(part, dict):
207+
text = part.get("text")
208+
if text:
209+
parts.append(str(text))
210+
else:
211+
parts.append(str(part))
212+
return "\n".join(parts)
213+
# Fallback to string conversion
214+
return str(output)
215+
except Exception:
216+
return str(output)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import logging
2+
import os
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, Any, Dict, List, cast
6+
7+
import requests
8+
9+
10+
class FireworksTracingHttpHandler(logging.Handler):
11+
"""Logging handler that posts structured logs to tracing.fireworks gateway /logs endpoint."""
12+
13+
def __init__(self, gateway_base_url: Optional[str] = None, rollout_id_env: str = "EP_ROLLOUT_ID") -> None:
14+
super().__init__()
15+
self.gateway_base_url = gateway_base_url or os.getenv("FW_TRACING_GATEWAY_BASE_URL")
16+
self.rollout_id_env = rollout_id_env
17+
self._session = requests.Session()
18+
self._lock = threading.Lock()
19+
20+
def emit(self, record: logging.LogRecord) -> None:
21+
try:
22+
if not self.gateway_base_url:
23+
return
24+
rollout_id = self._get_rollout_id(record)
25+
if not rollout_id:
26+
return
27+
payload = self._build_payload(record, rollout_id)
28+
url = f"{self.gateway_base_url.rstrip('/')}/logs"
29+
with self._lock:
30+
self._session.post(url, json=payload, timeout=5)
31+
except Exception:
32+
# Avoid raising exceptions from logging
33+
self.handleError(record)
34+
35+
def _get_rollout_id(self, record: logging.LogRecord) -> Optional[str]:
36+
if hasattr(record, "rollout_id") and cast(Any, getattr(record, "rollout_id")) is not None:
37+
return str(cast(Any, getattr(record, "rollout_id")))
38+
return os.getenv(self.rollout_id_env)
39+
40+
def _build_payload(self, record: logging.LogRecord, rollout_id: str) -> Dict[str, Any]:
41+
timestamp = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
42+
message = record.getMessage()
43+
tags: List[str] = [f"rollout_id:{rollout_id}"]
44+
# Optional additional tags
45+
if hasattr(record, "experiment_id") and cast(Any, getattr(record, "experiment_id")):
46+
tags.append(f"experiment_id:{cast(Any, getattr(record, 'experiment_id'))}")
47+
if hasattr(record, "run_id") and cast(Any, getattr(record, "run_id")):
48+
tags.append(f"run_id:{cast(Any, getattr(record, 'run_id'))}")
49+
program = cast(Optional[str], getattr(record, "program", None)) or "eval_protocol"
50+
status_val = cast(Any, getattr(record, "status", None))
51+
status = status_val if isinstance(status_val, str) else None
52+
return {
53+
"program": program,
54+
"status": status,
55+
"message": message,
56+
"tags": tags,
57+
"metadata": cast(Any, getattr(record, "metadata", None)),
58+
"extras": {
59+
"logger_name": record.name,
60+
"level": record.levelname,
61+
"timestamp": timestamp,
62+
},
63+
}

scripts/validate_dev_tracing.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import os
2+
import time
3+
import logging
4+
import requests
5+
6+
from eval_protocol import FireworksTracingHttpHandler
7+
8+
9+
def main():
10+
gateway = os.getenv("FW_TRACING_GATEWAY_BASE_URL")
11+
if not gateway:
12+
# default to deployed dev gateway
13+
gateway = "https://metadata-gateway-dev-644257448872.us-central1.run.app"
14+
rollout_id = os.getenv("EP_ROLLOUT_ID", f"sdk-dev-{int(time.time())}")
15+
16+
root = logging.getLogger()
17+
root.setLevel(logging.INFO)
18+
root.addHandler(FireworksTracingHttpHandler(gateway_base_url=gateway))
19+
20+
logger = logging.getLogger("eval_protocol.sdk.validate")
21+
22+
logger.info(
23+
"SDK sending structured log to dev gateway",
24+
extra={
25+
"rollout_id": rollout_id,
26+
"program": "eval_protocol",
27+
"status": "completed",
28+
"experiment_id": "dev-exp",
29+
"run_id": "dev-run",
30+
"metadata": {"source": "sdk-validate"},
31+
},
32+
)
33+
34+
# Poll fetch with retries for indexing
35+
params = {
36+
"tags": [f"rollout_id:{rollout_id}"],
37+
"program": "eval_protocol",
38+
"limit": 10,
39+
"hours_back": 1,
40+
}
41+
total = 0
42+
for _ in range(20):
43+
r = requests.get(f"{gateway}/logs", params=params, timeout=30)
44+
r.raise_for_status()
45+
data = r.json()
46+
total = int(data.get("total_entries") or 0)
47+
if total > 0:
48+
print("Fetched entries:", total)
49+
for e in data.get("entries", []):
50+
print({k: e.get(k) for k in ["timestamp", "severity", "program", "status", "message", "tags"]})
51+
break
52+
time.sleep(3)
53+
if total == 0:
54+
print("Fetched entries: 0 (after retries)")
55+
56+
57+
if __name__ == "__main__":
58+
main()

0 commit comments

Comments
 (0)