-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
127 lines (96 loc) · 4.14 KB
/
main.py
File metadata and controls
127 lines (96 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# main.py
from __future__ import annotations
import time
from typing import Any, Dict, List
from src.config import cfg
from src.sandbox import Sandbox
from src.llm_client import load_llm, ask_next_action
from src.vision import capture_screen, draw_preview
from src.guards import validate_xy, should_stop_on_repeat
from src.actions import execute_action
# Trim conversation history to avoid exceeding context length
def trim_history(history, keep_last=6):
if len(history) <= keep_last:
return history
return history[-keep_last:]
def main() -> None:
sandbox = Sandbox(cfg)
sandbox.start()
# Launch VNC viewer window (optional/noVNC)
if getattr(cfg, "OPEN_VNC_VIEWER", True):
sandbox.launch_vnc_viewer()
llm = load_llm()
print("[DEBUG] cfg.N_CTX =", cfg.N_CTX)
print("Agent ready. Type 'exit' or 'quit' to quit.")
# Main loop: runs until user exits
while True:
objective = input("\nEnter command (or 'quit'): ").strip()
# Recognize user exit
if not objective:
print("Command cannot be empty. Try again.")
continue
low = objective.lower()
if low in ("exit", "quit", "q"):
print("Agent shutting down.")
break
history: List[Dict[str, Any]] = []
step = 1
while True:
print(f"\n==================== STEP {step} ====================")
time.sleep(cfg.WAIT_BEFORE_SCREENSHOT_SEC)
# Capture current screenshot
img = capture_screen(sandbox, cfg.SCREENSHOT_PATH)
out: Dict[str, Any] | None = None
# Ask the model for the next action
for attempt in range(cfg.MODEL_RETRY + 1):
out = ask_next_action(llm, objective, cfg.SCREENSHOT_PATH, trim_history(history))
action = (out.get("action") or "NOOP").upper()
# Done statement
if action == "BITTI":
print("[MODEL] BITTI -> task completed, ending loop.")
break
# Normal coordinate-based actions like CLICK
if action in ("CLICK", "DOUBLE_CLICK", "RIGHT_CLICK"):
x = float(out.get("x", 0.5))
y = float(out.get("y", 0.5))
ok, reason = validate_xy(x, y)
if ok:
break
print(f"[WARN] Invalid coordinates ({reason}), retrying.")
history.append({"action": "INVALID_COORDS", "raw": out})
out = None
continue
# Other action types are accepted directly
break
# If out is None, the model could not produce a valid action
if out is None:
print("[ERROR] Model could not produce a valid action, ending loop for this command.")
break
print("[MODEL]", out)
# Repeat guard: stop if the same action is repeated
stop, why = should_stop_on_repeat(history, out)
if stop:
print(f"[STOP] {why} -> ending loop for this command.")
break
# If model returned BITTI (done)
if (out.get("action") or "").upper() == "BITTI":
print("Model says task is complete. Waiting for new command.")
break
# Draw preview (optional)
action = (out.get("action") or "").upper()
if action in ("CLICK", "DOUBLE_CLICK", "RIGHT_CLICK"):
preview_path = cfg.PREVIEW_PATH_TEMPLATE.format(i=step)
draw_preview(img, float(out["x"]), float(out["y"]), preview_path)
# Execute the action
execute_action(sandbox, out)
history.append(out)
step += 1
# Safety: stop if step count is too high
if step > cfg.MAX_STEPS:
print("[STOP] MAX_STEPS exceeded, ending loop for this command.")
break
# Loop for a single objective ends here,
# then ask the user for a new command
print("Ready for the next command.")
if __name__ == "__main__":
main()