-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcustom_config_example.py
More file actions
240 lines (190 loc) · 7.27 KB
/
custom_config_example.py
File metadata and controls
240 lines (190 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
"""
PlannerExecutorAgent example with custom configuration.
This example demonstrates various configuration options:
- Snapshot escalation (enable/disable, custom step sizes)
- Retry configuration (timeouts, max attempts)
- Vision fallback settings
Usage:
export OPENAI_API_KEY="sk-..."
python custom_config_example.py
"""
from __future__ import annotations
import asyncio
import os
from predicate import AsyncPredicateBrowser
from predicate.agent_runtime import AgentRuntime
from predicate.agents import (
PlannerExecutorAgent,
PlannerExecutorConfig,
RetryConfig,
SnapshotEscalationConfig,
)
from predicate.agents.browser_agent import VisionFallbackConfig
from predicate.backends.playwright_backend import PlaywrightBackend
from predicate.llm_provider import OpenAIProvider
async def example_default_config() -> None:
"""Default configuration: escalation enabled, step=30."""
print("\n--- Example 1: Default Config ---")
print("Escalation: 60 -> 90 -> 120 -> 150 -> 180 -> 200")
config = PlannerExecutorConfig()
print(f" snapshot.enabled: {config.snapshot.enabled}")
print(f" snapshot.limit_base: {config.snapshot.limit_base}")
print(f" snapshot.limit_step: {config.snapshot.limit_step}")
print(f" snapshot.limit_max: {config.snapshot.limit_max}")
async def example_disabled_escalation() -> None:
"""Disable escalation: always use limit_base."""
print("\n--- Example 2: Disabled Escalation ---")
print("Escalation: disabled (always 60)")
config = PlannerExecutorConfig(
snapshot=SnapshotEscalationConfig(enabled=False),
)
print(f" snapshot.enabled: {config.snapshot.enabled}")
print(f" snapshot.limit_base: {config.snapshot.limit_base}")
async def example_custom_step_size() -> None:
"""Custom step size for faster escalation."""
print("\n--- Example 3: Custom Step Size ---")
print("Escalation: 60 -> 110 -> 160 -> 200 (step=50)")
config = PlannerExecutorConfig(
snapshot=SnapshotEscalationConfig(
limit_step=50, # Larger steps = fewer iterations
),
)
print(f" snapshot.limit_step: {config.snapshot.limit_step}")
async def example_custom_limits() -> None:
"""Custom base and max limits."""
print("\n--- Example 4: Custom Limits ---")
print("Escalation: 100 -> 125 -> 150 -> 175 -> 200 -> 225 -> 250")
config = PlannerExecutorConfig(
snapshot=SnapshotEscalationConfig(
limit_base=100, # Start higher
limit_step=25, # Smaller increments
limit_max=250, # Higher maximum
),
)
print(f" snapshot.limit_base: {config.snapshot.limit_base}")
print(f" snapshot.limit_step: {config.snapshot.limit_step}")
print(f" snapshot.limit_max: {config.snapshot.limit_max}")
async def example_retry_config() -> None:
"""Custom retry configuration."""
print("\n--- Example 5: Retry Config ---")
config = PlannerExecutorConfig(
retry=RetryConfig(
verify_timeout_s=15.0, # Longer timeout for slow pages
verify_poll_s=0.3, # Faster polling
verify_max_attempts=10, # More verification attempts
executor_repair_attempts=3, # More repair attempts
max_replans=2, # Allow 2 replans on failure
),
)
print(f" retry.verify_timeout_s: {config.retry.verify_timeout_s}")
print(f" retry.verify_max_attempts: {config.retry.verify_max_attempts}")
print(f" retry.max_replans: {config.retry.max_replans}")
async def example_vision_fallback() -> None:
"""Vision fallback configuration."""
print("\n--- Example 6: Vision Fallback ---")
config = PlannerExecutorConfig(
vision=VisionFallbackConfig(
enabled=True,
max_vision_calls=5, # Up to 5 vision calls per run
trigger_requires_vision=True, # Trigger on require_vision status
trigger_canvas_or_low_actionables=True, # Trigger on canvas pages
),
)
print(f" vision.enabled: {config.vision.enabled}")
print(f" vision.max_vision_calls: {config.vision.max_vision_calls}")
async def example_full_custom() -> None:
"""Full custom configuration with all options."""
print("\n--- Example 7: Full Custom Config ---")
config = PlannerExecutorConfig(
# Snapshot escalation
snapshot=SnapshotEscalationConfig(
enabled=True,
limit_base=80,
limit_step=40,
limit_max=240,
),
# Retry settings
retry=RetryConfig(
verify_timeout_s=12.0,
verify_poll_s=0.4,
verify_max_attempts=6,
max_replans=2,
),
# Vision fallback
vision=VisionFallbackConfig(
enabled=True,
max_vision_calls=3,
),
# Planner settings
planner_max_tokens=3000,
planner_temperature=0.0,
# Executor settings
executor_max_tokens=128,
executor_temperature=0.0,
# Tracing
trace_screenshots=True,
trace_screenshot_format="jpeg",
trace_screenshot_quality=85,
)
print(" Full config created successfully!")
print(f" Escalation: {config.snapshot.limit_base} -> ... -> {config.snapshot.limit_max}")
print(f" Max replans: {config.retry.max_replans}")
print(f" Vision enabled: {config.vision.enabled}")
async def example_run_with_config() -> None:
"""Run agent with custom config."""
print("\n--- Example 8: Run Agent with Custom Config ---")
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
print(" Skipping (no OPENAI_API_KEY)")
return
predicate_api_key = os.getenv("PREDICATE_API_KEY")
# Create config optimized for reliability
config = PlannerExecutorConfig(
snapshot=SnapshotEscalationConfig(
enabled=True,
limit_base=60,
limit_step=30,
limit_max=180,
),
retry=RetryConfig(
verify_timeout_s=10.0,
max_replans=1,
),
)
planner = OpenAIProvider(model="gpt-4o")
executor = OpenAIProvider(model="gpt-4o-mini")
agent = PlannerExecutorAgent(
planner=planner,
executor=executor,
config=config,
)
async with AsyncPredicateBrowser(
api_key=predicate_api_key,
headless=True,
) as browser:
page = await browser.new_page()
await page.goto("https://example.com")
backend = PlaywrightBackend(page)
runtime = AgentRuntime(backend=backend)
result = await agent.run(
runtime=runtime,
task="Verify example.com is loaded",
)
print(f" Success: {result.success}")
print(f" Steps: {result.steps_completed}/{result.steps_total}")
async def main() -> None:
print("PlannerExecutorAgent Configuration Examples")
print("=" * 50)
await example_default_config()
await example_disabled_escalation()
await example_custom_step_size()
await example_custom_limits()
await example_retry_config()
await example_vision_fallback()
await example_full_custom()
await example_run_with_config()
print("\n" + "=" * 50)
print("Done!")
if __name__ == "__main__":
asyncio.run(main())