Skip to content

Commit fd9725e

Browse files
committed
add finish reason
1 parent 1d86b97 commit fd9725e

File tree

3 files changed

+262
-1
lines changed

3 files changed

+262
-1
lines changed

eval_protocol/adapters/fireworks_tracing.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import logging
99
import requests
1010
from datetime import datetime
11-
from typing import Any, Dict, List, Optional, Protocol
11+
import ast
1212
import os
13+
from typing import Any, Dict, List, Optional, Protocol
1314

1415
from eval_protocol.models import EvaluationRow, InputMetadata, ExecutionMetadata, Message
1516
from .base import BaseAdapter
@@ -44,6 +45,38 @@ def __call__(
4445
...
4546

4647

48+
def extract_openai_response(observations: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
49+
"""Attempt to extract and parse attributes from raw_gen_ai_request observation. This only works when stored in OTEL format.
50+
51+
Args:
52+
observations: List of observation dictionaries from the trace
53+
54+
Returns:
55+
Dict with all attributes parsed. Or None if not found.
56+
"""
57+
for obs in observations:
58+
if obs.get("name") == "raw_gen_ai_request" and obs.get("type") == "SPAN":
59+
metadata = obs.get("metadata", {})
60+
attributes = metadata.get("attributes", {})
61+
62+
result: Dict[str, Any] = {}
63+
64+
for key, value in attributes.items():
65+
# Try to parse stringified Python literals, otherwise keep as-is
66+
if isinstance(value, str) and value.startswith(("[", "{")):
67+
try:
68+
result[key] = ast.literal_eval(value)
69+
except Exception:
70+
result[key] = value
71+
else:
72+
result[key] = value
73+
74+
if result:
75+
return result
76+
77+
return None
78+
79+
4780
def convert_trace_dict_to_evaluation_row(
4881
trace: Dict[str, Any], include_tool_calls: bool = True, span_name: Optional[str] = None
4982
) -> Optional[EvaluationRow]:
@@ -96,6 +129,14 @@ def convert_trace_dict_to_evaluation_row(
96129
):
97130
break # Break early if we've found all the metadata we need
98131

132+
observations = trace.get("observations", [])
133+
# We can only extract when stored in OTEL format.
134+
openai_response = extract_openai_response(observations)
135+
if openai_response:
136+
choices = openai_response.get("llm.openai.choices")
137+
if choices and len(choices) > 0:
138+
execution_metadata.finish_reason = choices[0].get("finish_reason")
139+
99140
return EvaluationRow(
100141
messages=messages,
101142
tools=tools,

eval_protocol/proxy/proxy_core/langfuse.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def _serialize_trace_to_dict(trace_full: Any) -> Dict[str, Any]:
5050
"input": getattr(obs, "input", None),
5151
"output": getattr(obs, "output", None),
5252
"parent_observation_id": getattr(obs, "parent_observation_id", None),
53+
"metadata": getattr(obs, "metadata", None),
5354
}
5455
for obs in getattr(trace_full, "observations", [])
5556
]

scripts/fetch_traces_test.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#!/usr/bin/env python3
2+
"""Simple script to fetch traces directly from Langfuse and parse them.
3+
4+
This bypasses the Fireworks tracing proxy (and its Redis insertion_id check)
5+
by querying Langfuse directly.
6+
7+
Required env vars:
8+
LANGFUSE_PUBLIC_KEY - Your Langfuse public key
9+
LANGFUSE_SECRET_KEY - Your Langfuse secret key
10+
LANGFUSE_HOST - Langfuse host (default: https://cloud.langfuse.com)
11+
ROLLOUT_ID - The rollout_id to search for (default: test-test-test)
12+
"""
13+
14+
import json
15+
import os
16+
from datetime import datetime, timedelta
17+
from typing import List, Dict, Any
18+
19+
from eval_protocol.adapters.fireworks_tracing import convert_trace_dict_to_evaluation_row
20+
from eval_protocol.models import EvaluationRow
21+
22+
23+
os.environ.setdefault("LANGFUSE_PUBLIC_KEY", "pk-lf-9470ba98-7ace-4fe0-b1dc-3dda0f66d812")
24+
os.environ.setdefault("LANGFUSE_SECRET_KEY", "sk-lf-36b11237-a230-4524-a6e0-3af372b6f5b6")
25+
os.environ.setdefault("LANGFUSE_HOST", "https://langfuse-prod.fireworks.ai") # EU region
26+
27+
28+
def fetch_traces_from_langfuse(
29+
tags: List[str],
30+
limit: int = 100,
31+
hours_back: int = 24,
32+
) -> List[Dict[str, Any]]:
33+
"""Fetch traces directly from Langfuse (bypassing Fireworks proxy).
34+
35+
This avoids the Redis insertion_id check by going straight to Langfuse.
36+
"""
37+
try:
38+
from langfuse import Langfuse
39+
except ImportError:
40+
print("ERROR: langfuse not installed. Run: pip install langfuse")
41+
return []
42+
43+
# Get Langfuse credentials from environment
44+
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
45+
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
46+
host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
47+
48+
if not public_key or not secret_key:
49+
print("ERROR: LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY must be set")
50+
return []
51+
52+
print(f"Connecting to Langfuse at {host}...")
53+
client = Langfuse(public_key=public_key, secret_key=secret_key, host=host)
54+
55+
# Calculate time range
56+
to_ts = datetime.now()
57+
from_ts = to_ts - timedelta(hours=hours_back)
58+
59+
print(f"Fetching traces with tags: {tags}")
60+
print(f"Time range: {from_ts} to {to_ts}")
61+
62+
# Fetch trace list
63+
traces_response = client.api.trace.list(
64+
page=1,
65+
limit=limit,
66+
tags=tags,
67+
from_timestamp=from_ts,
68+
to_timestamp=to_ts,
69+
order_by="timestamp.desc",
70+
)
71+
72+
if not traces_response or not traces_response.data:
73+
print("No traces found in list response")
74+
return []
75+
76+
print(f"Found {len(traces_response.data)} trace summaries")
77+
78+
# Fetch full trace details and serialize to dict
79+
traces: List[Dict[str, Any]] = []
80+
for trace_info in traces_response.data:
81+
try:
82+
trace_full = client.api.trace.get(trace_info.id)
83+
84+
# Serialize to dict (same format as proxy returns)
85+
trace_dict = _serialize_trace_to_dict(trace_full)
86+
traces.append(trace_dict)
87+
88+
except Exception as e:
89+
print(f" Failed to fetch trace {trace_info.id}: {e}")
90+
91+
print(f"Successfully fetched {len(traces)} full traces")
92+
return traces
93+
94+
95+
def _serialize_trace_to_dict(trace_full: Any) -> Dict[str, Any]:
96+
"""Convert Langfuse trace object to dict format (same as proxy does)."""
97+
timestamp = getattr(trace_full, "timestamp", None)
98+
99+
return {
100+
"id": trace_full.id,
101+
"name": getattr(trace_full, "name", None),
102+
"user_id": getattr(trace_full, "user_id", None),
103+
"session_id": getattr(trace_full, "session_id", None),
104+
"tags": getattr(trace_full, "tags", []),
105+
"timestamp": str(timestamp) if timestamp else None,
106+
"input": getattr(trace_full, "input", None),
107+
"output": getattr(trace_full, "output", None),
108+
"metadata": getattr(trace_full, "metadata", None),
109+
"observations": [
110+
{
111+
"id": obs.id,
112+
"type": getattr(obs, "type", None),
113+
"name": getattr(obs, "name", None),
114+
"start_time": str(getattr(obs, "start_time", None)) if getattr(obs, "start_time", None) else None,
115+
"end_time": str(getattr(obs, "end_time", None)) if getattr(obs, "end_time", None) else None,
116+
"input": getattr(obs, "input", None),
117+
"output": getattr(obs, "output", None),
118+
"parent_observation_id": getattr(obs, "parent_observation_id", None),
119+
"metadata": getattr(obs, "metadata", None),
120+
}
121+
for obs in getattr(trace_full, "observations", [])
122+
]
123+
if hasattr(trace_full, "observations")
124+
else [],
125+
}
126+
127+
128+
def parse_traces_to_rows(traces: List[Dict[str, Any]], include_tool_calls: bool = True) -> List[EvaluationRow]:
129+
"""Parse raw trace dicts to EvaluationRows using the same logic as get_evaluation_rows."""
130+
rows = []
131+
for trace in traces:
132+
try:
133+
row = convert_trace_dict_to_evaluation_row(trace, include_tool_calls)
134+
if row:
135+
rows.append(row)
136+
except Exception as e:
137+
print(f" Failed to convert trace {trace.get('id')}: {e}")
138+
return rows
139+
140+
141+
def print_row_details(row: EvaluationRow, index: int):
142+
"""Print details of a single EvaluationRow."""
143+
print(f"\n--- Row {index + 1} ---")
144+
print(f"Row ID: {row.input_metadata.row_id}")
145+
print(
146+
f"Trace ID: {row.input_metadata.session_data.get('langfuse_trace_id') if row.input_metadata.session_data else None}"
147+
)
148+
print(f"Rollout ID: {row.execution_metadata.rollout_id}")
149+
print(f"Invocation ID: {row.execution_metadata.invocation_id}")
150+
print(f"Experiment ID: {row.execution_metadata.experiment_id}")
151+
print(f"Run ID: {row.execution_metadata.run_id}")
152+
print(f"Finish Reason: {row.execution_metadata.finish_reason}") # NEW
153+
print(f"Num messages: {len(row.messages)}")
154+
print(f"Tools: {row.tools is not None}")
155+
156+
print("\nMessages:")
157+
for j, msg in enumerate(row.messages):
158+
content_preview = str(msg.content)[:100] if msg.content else "(empty)"
159+
tool_calls_info = f" [tool_calls: {len(msg.tool_calls)}]" if msg.tool_calls else ""
160+
print(f" [{j}] {msg.role}: {content_preview}{tool_calls_info}")
161+
162+
163+
def main():
164+
rollout_id = os.environ.get("ROLLOUT_ID", "test-test-test")
165+
hours_back = int(os.environ.get("HOURS_BACK", "24"))
166+
167+
print(f"Rollout ID: {rollout_id}")
168+
print(f"Hours back: {hours_back}")
169+
print("=" * 60)
170+
171+
# Step 1: Fetch raw traces directly from Langfuse
172+
print("\n[1] Fetching raw traces from Langfuse...")
173+
traces = fetch_traces_from_langfuse(
174+
tags=[f"rollout_id:{rollout_id}"],
175+
limit=10,
176+
hours_back=hours_back,
177+
)
178+
179+
if not traces:
180+
print("\nNo traces found!")
181+
return
182+
183+
# Step 2: Print raw trace structure (first trace only)
184+
print("\n[2] Raw trace structure (first trace):")
185+
print("-" * 60)
186+
first_trace = traces[0]
187+
print(f"ID: {first_trace.get('id')}")
188+
print(f"Name: {first_trace.get('name')}")
189+
print(f"Tags: {first_trace.get('tags')}")
190+
print(f"Input type: {type(first_trace.get('input'))}")
191+
print(f"Input: {json.dumps(first_trace.get('input'), indent=2)[:500]}...")
192+
print(f"Output type: {type(first_trace.get('output'))}")
193+
print(f"Output: {json.dumps(first_trace.get('output'), indent=2)[:500] if first_trace.get('output') else None}...")
194+
print(f"Num observations: {len(first_trace.get('observations', []))}")
195+
196+
# Print observations
197+
for obs in first_trace.get("observations", []):
198+
print(f"\n Observation: {obs.get('name')} ({obs.get('type')})")
199+
print(f" Input type: {type(obs.get('input'))}")
200+
print(f" Input: {json.dumps(obs.get('input'), indent=2)[:300] if obs.get('input') else None}...")
201+
print(f" Output type: {type(obs.get('output'))}")
202+
print(f" Output: {json.dumps(obs.get('output'), indent=2)[:300] if obs.get('output') else None}...")
203+
204+
# Step 3: Parse to EvaluationRows
205+
print("\n[3] Parsing traces to EvaluationRows...")
206+
print("-" * 60)
207+
rows = parse_traces_to_rows(traces)
208+
209+
print(f"\nSuccessfully parsed {len(rows)} / {len(traces)} traces")
210+
211+
# Step 4: Print row details
212+
print("\n[4] EvaluationRow details:")
213+
print("=" * 60)
214+
for i, row in enumerate(rows):
215+
print_row_details(row, i)
216+
217+
218+
if __name__ == "__main__":
219+
main()

0 commit comments

Comments
 (0)