-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcomputer.py
More file actions
262 lines (224 loc) · 9.82 KB
/
computer.py
File metadata and controls
262 lines (224 loc) · 9.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
Gemini Computer Tool - Maps Gemini actions to Kernel Computer Controls API.
Based on Google's computer-use-preview reference implementation.
"""
import asyncio
import base64
from kernel import Kernel
from .types import (
GeminiAction,
GeminiFunctionArgs,
PREDEFINED_COMPUTER_USE_FUNCTIONS,
DEFAULT_SCREEN_SIZE,
COORDINATE_SCALE,
ToolResult,
ScreenSize,
)
TYPING_DELAY_MS = 12
SCREENSHOT_DELAY_SECS = 0.5
PX_PER_NOTCH = 60
MAX_NOTCHES_PER_ACTION = 17
class ComputerTool:
def __init__(
self,
kernel: Kernel,
session_id: str,
screen_size: ScreenSize = DEFAULT_SCREEN_SIZE,
):
self.kernel = kernel
self.session_id = session_id
self.screen_size = screen_size
def denormalize_x(self, x: int) -> int:
return int((x / COORDINATE_SCALE) * self.screen_size.width)
def denormalize_y(self, y: int) -> int:
return int((y / COORDINATE_SCALE) * self.screen_size.height)
async def screenshot(self) -> ToolResult:
try:
await asyncio.sleep(SCREENSHOT_DELAY_SECS)
response = self.kernel.browsers.computer.capture_screenshot(self.session_id)
screenshot_bytes = response.read()
return ToolResult(
base64_image=base64.b64encode(screenshot_bytes).decode("utf-8"),
url="about:blank",
)
except Exception as e:
return ToolResult(error=f"Failed to take screenshot: {e}", url="about:blank")
async def execute_action(
self, action_name: str, args: GeminiFunctionArgs
) -> ToolResult:
# Check if this is a known computer use function
if action_name not in [a.value for a in PREDEFINED_COMPUTER_USE_FUNCTIONS]:
return ToolResult(error=f"Unknown action: {action_name}")
try:
if action_name == GeminiAction.OPEN_WEB_BROWSER:
# Browser is already open in Kernel, just return screenshot
pass
elif action_name == GeminiAction.CLICK_AT:
if "x" not in args or "y" not in args:
return ToolResult(error="click_at requires x and y coordinates")
x = self.denormalize_x(args["x"])
y = self.denormalize_y(args["y"])
self.kernel.browsers.computer.click_mouse(
self.session_id,
x=x,
y=y,
button="left",
click_type="click",
num_clicks=1,
)
elif action_name == GeminiAction.HOVER_AT:
if "x" not in args or "y" not in args:
return ToolResult(error="hover_at requires x and y coordinates")
x = self.denormalize_x(args["x"])
y = self.denormalize_y(args["y"])
self.kernel.browsers.computer.move_mouse(
self.session_id, x=x, y=y
)
elif action_name == GeminiAction.TYPE_TEXT_AT:
if "x" not in args or "y" not in args:
return ToolResult(error="type_text_at requires x and y coordinates")
if "text" not in args:
return ToolResult(error="type_text_at requires text")
x = self.denormalize_x(args["x"])
y = self.denormalize_y(args["y"])
# Click at the location first
self.kernel.browsers.computer.click_mouse(
self.session_id,
x=x,
y=y,
button="left",
click_type="click",
num_clicks=1,
)
# Clear existing text if requested (default: true)
if args.get("clear_before_typing", True):
self.kernel.browsers.computer.press_key(
self.session_id, keys=["ctrl+a"]
)
await asyncio.sleep(0.05)
# Type the text
self.kernel.browsers.computer.type_text(
self.session_id,
text=args["text"],
delay=TYPING_DELAY_MS,
)
# Press enter if requested
if args.get("press_enter", False):
await asyncio.sleep(0.1)
self.kernel.browsers.computer.press_key(
self.session_id, keys=["Return"]
)
elif action_name == GeminiAction.SCROLL_DOCUMENT:
if "direction" not in args:
return ToolResult(error="scroll_document requires direction")
center_x = self.screen_size.width // 2
center_y = self.screen_size.height // 2
magnitude_px = args.get("magnitude", 400)
doc_notches = min(MAX_NOTCHES_PER_ACTION, max(1, round(magnitude_px / PX_PER_NOTCH)))
direction = args["direction"]
delta_x = delta_y = 0
if direction == "down":
delta_y = doc_notches
elif direction == "up":
delta_y = -doc_notches
elif direction == "right":
delta_x = doc_notches
elif direction == "left":
delta_x = -doc_notches
self.kernel.browsers.computer.scroll(
self.session_id,
x=center_x,
y=center_y,
delta_x=delta_x,
delta_y=delta_y,
)
elif action_name == GeminiAction.SCROLL_AT:
if "x" not in args or "y" not in args:
return ToolResult(error="scroll_at requires x and y coordinates")
if "direction" not in args:
return ToolResult(error="scroll_at requires direction")
x = self.denormalize_x(args["x"])
y = self.denormalize_y(args["y"])
magnitude_px = args.get("magnitude", 400)
notches = min(MAX_NOTCHES_PER_ACTION, max(1, round(magnitude_px / PX_PER_NOTCH)))
direction = args["direction"]
delta_x = delta_y = 0
if direction == "down":
delta_y = notches
elif direction == "up":
delta_y = -notches
elif direction == "right":
delta_x = notches
elif direction == "left":
delta_x = -notches
self.kernel.browsers.computer.scroll(
self.session_id,
x=x,
y=y,
delta_x=delta_x,
delta_y=delta_y,
)
elif action_name == GeminiAction.WAIT_5_SECONDS:
await asyncio.sleep(5)
elif action_name == GeminiAction.GO_BACK:
self.kernel.browsers.computer.press_key(
self.session_id, keys=["alt+Left"]
)
await asyncio.sleep(1)
elif action_name == GeminiAction.GO_FORWARD:
self.kernel.browsers.computer.press_key(
self.session_id, keys=["alt+Right"]
)
await asyncio.sleep(1)
elif action_name == GeminiAction.SEARCH:
# Focus URL bar (Ctrl+L) - equivalent to clicking search
self.kernel.browsers.computer.press_key(
self.session_id, keys=["ctrl+l"]
)
elif action_name == GeminiAction.NAVIGATE:
if "url" not in args:
return ToolResult(error="navigate requires url")
# Focus URL bar and type the URL
self.kernel.browsers.computer.press_key(
self.session_id, keys=["ctrl+l"]
)
await asyncio.sleep(0.1)
self.kernel.browsers.computer.type_text(
self.session_id,
text=args["url"],
delay=TYPING_DELAY_MS,
)
await asyncio.sleep(0.1)
self.kernel.browsers.computer.press_key(
self.session_id, keys=["Return"]
)
await asyncio.sleep(1.5) # Wait for navigation
elif action_name == GeminiAction.KEY_COMBINATION:
if "keys" not in args:
return ToolResult(error="key_combination requires keys")
# Gemini sends keys as "key1+key2+key3"
self.kernel.browsers.computer.press_key(
self.session_id, keys=[args["keys"]]
)
elif action_name == GeminiAction.DRAG_AND_DROP:
required = ["x", "y", "destination_x", "destination_y"]
if not all(k in args for k in required):
return ToolResult(
error="drag_and_drop requires x, y, destination_x, and destination_y"
)
start_x = self.denormalize_x(args["x"])
start_y = self.denormalize_y(args["y"])
end_x = self.denormalize_x(args["destination_x"])
end_y = self.denormalize_y(args["destination_y"])
self.kernel.browsers.computer.drag_mouse(
self.session_id,
path=[[start_x, start_y], [end_x, end_y]],
button="left",
)
else:
return ToolResult(error=f"Unhandled action: {action_name}")
# Wait a moment for the action to complete, then take a screenshot
await asyncio.sleep(SCREENSHOT_DELAY_SECS)
return await self.screenshot()
except Exception as e:
return ToolResult(error=f"Action failed: {e}", url="about:blank")