-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtracer_factory.py
More file actions
268 lines (227 loc) · 10.1 KB
/
tracer_factory.py
File metadata and controls
268 lines (227 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""
Tracer factory with automatic tier detection.
Provides convenient factory function for creating tracers with cloud upload support.
"""
import gzip
import os
import uuid
from pathlib import Path
import requests
from sentience.cloud_tracing import CloudTraceSink, SentienceLogger
from sentience.tracing import JsonlTraceSink, Tracer
# Sentience API base URL (constant)
SENTIENCE_API_URL = "https://api.sentienceapi.com"
def create_tracer(
api_key: str | None = None,
run_id: str | None = None,
api_url: str | None = None,
logger: SentienceLogger | None = None,
upload_trace: bool = False,
) -> Tracer:
"""
Create tracer with automatic tier detection.
Tier Detection:
- If api_key is provided: Try to initialize CloudTraceSink (Pro/Enterprise)
- If cloud init fails or no api_key: Fall back to JsonlTraceSink (Free tier)
Args:
api_key: Sentience API key (e.g., "sk_pro_xxxxx")
- Free tier: None or empty
- Pro/Enterprise: Valid API key
run_id: Unique identifier for this agent run. If not provided, generates UUID.
api_url: Sentience API base URL (default: https://api.sentienceapi.com)
logger: Optional logger instance for logging file sizes and errors
upload_trace: Enable cloud trace upload (default: False). When True and api_key
is provided, traces will be uploaded to cloud. When False, traces
are saved locally only.
Returns:
Tracer configured with appropriate sink
Example:
>>> # Pro tier user
>>> tracer = create_tracer(api_key="sk_pro_xyz", run_id="demo")
>>> # Returns: Tracer with CloudTraceSink
>>>
>>> # Free tier user
>>> tracer = create_tracer(run_id="demo")
>>> # Returns: Tracer with JsonlTraceSink (local-only)
>>>
>>> # Use with agent
>>> agent = SentienceAgent(browser, llm, tracer=tracer)
>>> agent.act("Click search")
>>> tracer.close() # Uploads to cloud if Pro tier
"""
if run_id is None:
run_id = str(uuid.uuid4())
if api_url is None:
api_url = SENTIENCE_API_URL
# 0. Check for orphaned traces from previous crashes (if api_key provided and upload enabled)
if api_key and upload_trace:
_recover_orphaned_traces(api_key, api_url)
# 1. Try to initialize Cloud Sink (Pro/Enterprise tier) if upload enabled
if api_key and upload_trace:
try:
# Request pre-signed upload URL from backend
response = requests.post(
f"{api_url}/v1/traces/init",
headers={"Authorization": f"Bearer {api_key}"},
json={"run_id": run_id},
timeout=10,
)
if response.status_code == 200:
data = response.json()
upload_url = data.get("upload_url")
if upload_url:
print("☁️ [Sentience] Cloud tracing enabled (Pro tier)")
return Tracer(
run_id=run_id,
sink=CloudTraceSink(
upload_url=upload_url,
run_id=run_id,
api_key=api_key,
api_url=api_url,
logger=logger,
),
)
else:
print("⚠️ [Sentience] Cloud init response missing upload_url")
print(f" Response data: {data}")
print(" Falling back to local-only tracing")
elif response.status_code == 403:
print("⚠️ [Sentience] Cloud tracing requires Pro tier")
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get("message", "")
if error_msg:
print(f" API Error: {error_msg}")
except Exception:
pass
print(" Falling back to local-only tracing")
elif response.status_code == 401:
print("⚠️ [Sentience] Cloud init failed: HTTP 401 Unauthorized")
print(" API key is invalid or expired")
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get("message", "")
if error_msg:
print(f" API Error: {error_msg}")
except Exception:
pass
print(" Falling back to local-only tracing")
else:
print(f"⚠️ [Sentience] Cloud init failed: HTTP {response.status_code}")
try:
error_data = response.json()
error_msg = error_data.get("error") or error_data.get(
"message", "Unknown error"
)
print(f" Error: {error_msg}")
if "tier" in error_msg.lower() or "subscription" in error_msg.lower():
print(f" 💡 This may be a tier/subscription issue")
except Exception:
print(f" Response: {response.text[:200]}")
print(" Falling back to local-only tracing")
except requests.exceptions.Timeout:
print("⚠️ [Sentience] Cloud init timeout")
print(" Falling back to local-only tracing")
except requests.exceptions.ConnectionError:
print("⚠️ [Sentience] Cloud init connection error")
print(" Falling back to local-only tracing")
except Exception as e:
print(f"⚠️ [Sentience] Cloud init error: {e}")
print(" Falling back to local-only tracing")
# 2. Fallback to Local Sink (Free tier / Offline mode)
traces_dir = Path("traces")
traces_dir.mkdir(exist_ok=True)
local_path = traces_dir / f"{run_id}.jsonl"
print(f"💾 [Sentience] Local tracing: {local_path}")
return Tracer(run_id=run_id, sink=JsonlTraceSink(str(local_path)))
def _recover_orphaned_traces(api_key: str, api_url: str = SENTIENCE_API_URL) -> None:
"""
Attempt to upload orphaned traces from previous crashed runs.
Scans ~/.sentience/traces/pending/ for un-uploaded trace files and
attempts to upload them using the provided API key.
Args:
api_key: Sentience API key for authentication
api_url: Sentience API base URL (defaults to SENTIENCE_API_URL)
"""
pending_dir = Path.home() / ".sentience" / "traces" / "pending"
if not pending_dir.exists():
return
orphaned = list(pending_dir.glob("*.jsonl"))
if not orphaned:
return
# Filter out test files (run_ids that start with "test-" or are clearly test data)
# These are likely from local testing and shouldn't be uploaded
test_patterns = ["test-", "test_", "test."]
valid_orphaned = [
f
for f in orphaned
if not any(f.stem.startswith(pattern) for pattern in test_patterns)
and not f.stem.startswith("test")
]
if not valid_orphaned:
return
print(f"⚠️ [Sentience] Found {len(valid_orphaned)} un-uploaded trace(s) from previous runs")
print(" Attempting to upload now...")
for trace_file in valid_orphaned:
try:
# Extract run_id from filename (format: {run_id}.jsonl)
run_id = trace_file.stem
# Request new upload URL for this run_id
response = requests.post(
f"{api_url}/v1/traces/init",
headers={"Authorization": f"Bearer {api_key}"},
json={"run_id": run_id},
timeout=10,
)
if response.status_code != 200:
# HTTP 409 means trace already exists (already uploaded)
# Treat as success and delete local file
if response.status_code == 409:
print(f"✅ Trace {run_id} already exists in cloud (skipping re-upload)")
# Delete local file since it's already in cloud
try:
os.remove(trace_file)
except Exception:
pass # Ignore cleanup errors
continue
# HTTP 422 typically means invalid run_id (e.g., test files)
# Skip silently for 422, but log other errors
if response.status_code == 422:
# Likely a test file or invalid run_id, skip silently
continue
print(f"❌ Failed to get upload URL for {run_id}: HTTP {response.status_code}")
continue
data = response.json()
upload_url = data.get("upload_url")
if not upload_url:
print(f"❌ Upload URL missing for {run_id}")
continue
# Read and compress trace file
with open(trace_file, "rb") as f:
trace_data = f.read()
compressed_data = gzip.compress(trace_data)
# Upload to cloud
upload_response = requests.put(
upload_url,
data=compressed_data,
headers={
"Content-Type": "application/x-gzip",
"Content-Encoding": "gzip",
},
timeout=60,
)
if upload_response.status_code == 200:
print(f"✅ Uploaded orphaned trace: {run_id}")
# Delete file on successful upload
try:
os.remove(trace_file)
except Exception:
pass # Ignore cleanup errors
else:
print(f"❌ Failed to upload {run_id}: HTTP {upload_response.status_code}")
except requests.exceptions.Timeout:
print(f"❌ Timeout uploading {trace_file.name}")
except requests.exceptions.ConnectionError:
print(f"❌ Connection error uploading {trace_file.name}")
except Exception as e:
print(f"❌ Error uploading {trace_file.name}: {e}")