-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathforge_agent.py
More file actions
2177 lines (1961 loc) · 101 KB
/
forge_agent.py
File metadata and controls
2177 lines (1961 loc) · 101 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Nova Forge Agent — the core tool-use loop replacing Claude Code.
ForgeAgent sends a prompt + tools to any LLM via ModelRouter, executes
tool calls with hook enforcement and PathSandbox checks, and loops until
the model stops requesting tools or max_turns is reached.
"""
from __future__ import annotations
import asyncio
import fnmatch
import json
import logging
import os
import random
import re
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from config import ModelConfig, get_model_config, get_provider, get_prompt_budget
from forge_guards import PathSandbox, RiskClassifier, RiskLevel, SandboxViolation, AutonomyManager
from forge_hooks import HookSystem, HookResult
from model_router import ModelRouter, ModelResponse, ToolCall, StreamDelta
logger = logging.getLogger(__name__)
MAX_API_RETRIES = 3
# ── Result dataclass ─────────────────────────────────────────────────────────
@dataclass
class AgentResult:
"""Outcome of a ForgeAgent.run() invocation."""
output: str = ""
turns: int = 0
artifacts: dict[str, Any] = field(default_factory=dict)
tool_calls_made: int = 0
error: str | None = None
model_id: str = "" # which model completed this
tokens_in: int = 0 # total input tokens
tokens_out: int = 0 # total output tokens
escalated: bool = False # was model escalated?
self_corrections: int = 0 # number of self-correction turns taken
@dataclass
class AgentEvent:
"""Structured event emitted during agent execution."""
kind: str # turn_start, model_response, tool_start, tool_end, compact, error, stream_delta, model_escalation
turn: int = 0
tool_name: str = ""
tool_args: dict = field(default_factory=dict)
file_path: str = ""
file_action: str = "" # read, write, edit, run, search
tokens_in: int = 0
tokens_out: int = 0
duration_ms: int = 0
error: str = ""
delta: Any = None # StreamDelta for stream_delta events
# ── Tool definitions (common format for all providers) ───────────────────────
BUILT_IN_TOOLS: list[dict] = [
{
"name": "read_file",
"description": (
"Read the contents of a file. Returns line-numbered content with a metadata header.\n\n"
"- You MUST read a file before editing it with edit_file.\n"
"- Use offset and limit for large files (e.g., offset=100, limit=50 to read lines 100-150).\n"
"- For searching file contents, prefer grep over reading entire files.\n"
"- Returns: metadata header (type, lines, size) + numbered content."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path (relative to project root or absolute)"},
"offset": {"type": "integer", "description": "Line number to start reading from (1-based, optional)"},
"limit": {"type": "integer", "description": "Maximum number of lines to return (optional, default: all)"},
},
"required": ["path"],
},
},
{
"name": "write_file",
"description": (
"Create a new file or completely overwrite an existing file with the given content.\n\n"
"- PREFER edit_file for modifying existing files — it is safer and more precise.\n"
"- Never overwrite a file you haven't read first.\n"
"- Creates parent directories automatically.\n"
"- Runs a syntax check after writing (.py, .json, .yaml) and reports the result."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path (relative to project root or absolute)"},
"content": {"type": "string", "description": "Full file content to write"},
},
"required": ["path", "content"],
},
},
{
"name": "append_file",
"description": (
"Append content to the end of an existing file, or create it if it does not exist.\n\n"
"- Use write_file FIRST to create the file with the initial section.\n"
"- Then call append_file one or more times to add remaining sections.\n"
"- For large files (~80+ lines), use: write_file (first ~80 lines) + append_file (rest).\n"
"- Runs syntax check after appending (.py, .json, .yaml)."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"content": {"type": "string", "description": "Content to append"},
},
"required": ["path", "content"],
},
},
{
"name": "edit_file",
"description": (
"Replace an exact string in a file with new content. Performs a precise single replacement.\n\n"
"- ALWAYS call read_file first to understand the current content.\n"
"- old_string MUST appear exactly once — if it appears multiple times, include more surrounding\n"
" context to make it unique.\n"
"- For renaming a variable everywhere, use search_replace_all instead.\n"
"- Runs a syntax check after editing (.py, .json, .yaml) and reports the result."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"old_string": {"type": "string", "description": "Exact text to find (must appear exactly once)"},
"new_string": {"type": "string", "description": "Replacement text"},
},
"required": ["path", "old_string", "new_string"],
},
},
{
"name": "bash",
"description": (
"Execute a shell command in the project directory and return stdout + stderr.\n\n"
"- Do NOT use for cat/head/tail — use read_file instead.\n"
"- Do NOT use for grep/find/ls — use grep, glob_files, list_directory instead.\n"
"- Always check exit codes in the output (non-zero = failure).\n"
"- Long-running commands have a 120-second timeout."
),
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to execute"},
"cwd": {"type": "string", "description": "Working directory (default: project root)"},
},
"required": ["command"],
},
},
{
"name": "glob_files",
"description": (
"Find files matching a glob pattern. Use INSTEAD of bash find or ls.\n\n"
"- Patterns: '**/*.py' (all Python files), 'src/**/*.ts' (TypeScript in src/).\n"
"- Returns relative paths sorted by modification time (newest first).\n"
"- Use path parameter to restrict search to a subdirectory.\n"
"- Faster and safer than running bash find commands."
),
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob pattern (e.g. '**/*.py', 'src/**/*.ts')"},
"path": {"type": "string", "description": "Base directory to search (default: project root)"},
},
"required": ["pattern"],
},
},
{
"name": "grep",
"description": (
"Search file contents by regex pattern. Use INSTEAD of bash grep.\n\n"
"- Returns line numbers, match count summary, and matching lines.\n"
"- Supports standard regex: '\\bfoo\\b', 'def \\w+', 'import.*from'.\n"
"- Searches .py, .js, .ts, .json, .yaml, .yml, .md, .txt, .html, .css, .sh files.\n"
"- Compact output: first 50 matches shown, with total count."
),
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Regex pattern to search for"},
"path": {"type": "string", "description": "File or directory to search (default: project root)"},
},
"required": ["pattern"],
},
},
{
"name": "think",
"description": (
"Use this tool to reason through a problem step by step before taking action. "
"The output is not shown to the user — this is your private scratchpad.\n\n"
"Use it when:\n"
"- Planning multi-step changes\n"
"- Debugging complex issues\n"
"- Weighing multiple approaches\n"
"- Before writing significant code"
),
"parameters": {
"type": "object",
"properties": {
"reasoning": {"type": "string", "description": "Your step-by-step reasoning"},
},
"required": ["reasoning"],
},
},
{
"name": "list_directory",
"description": (
"List contents of a directory with file types, sizes, and item counts.\n\n"
"Use this instead of bash ls. Returns structured output with metadata.\n"
"Subdirectories show item counts. Files show size and modification time."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path (default: project root)"},
},
"required": [],
},
},
{
"name": "search_replace_all",
"description": (
"Replace ALL occurrences of a string in a file. Use for renaming variables, "
"updating imports, or bulk replacements.\n\n"
"For single precise edits, use edit_file instead.\n"
"Returns the count of replacements made."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"old_string": {"type": "string", "description": "String to find (all occurrences)"},
"new_string": {"type": "string", "description": "Replacement string"},
},
"required": ["path", "old_string", "new_string"],
},
},
{
"name": "remember",
"description": (
"Save a note to project memory that persists across sessions.\n\n"
"Use for: patterns discovered, conventions confirmed, user preferences, "
"solutions to recurring problems. Do NOT save session-specific context."
),
"parameters": {
"type": "object",
"properties": {
"note": {"type": "string", "description": "What to remember"},
"category": {"type": "string", "description": "Category: pattern | preference | solution | convention"},
},
"required": ["note"],
},
},
{
"name": "claim_file",
"description": (
"Claim exclusive write access to a file. Other agents cannot modify "
"files you've claimed. Call BEFORE writing to prevent conflicts."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path relative to project root"},
},
"required": ["path"],
},
},
{
"name": "replace_lines",
"description": (
"Replace a range of lines in a file by line number. Perfect for structural edits:\n"
"wrapping a block in an if-statement, re-indenting code, replacing multi-line sections.\n\n"
"- You MUST read_file first to see current line numbers.\n"
"- Lines are 1-based and inclusive (start_line=5, end_line=10 replaces lines 5 through 10).\n"
"- new_content replaces ALL specified lines — include proper indentation.\n"
"- Runs a syntax check after editing (.py, .json, .yaml)."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"start_line": {"type": "integer", "description": "First line to replace (1-based, inclusive)"},
"end_line": {"type": "integer", "description": "Last line to replace (1-based, inclusive)"},
"new_content": {"type": "string", "description": "Replacement text (replaces all lines from start to end)"},
},
"required": ["path", "start_line", "end_line", "new_content"],
},
},
{
"name": "check_context",
"description": (
"Check what other agents have done: files claimed/written, "
"announcements (endpoints, exports), module dependencies."
),
"parameters": {
"type": "object",
"properties": {
"focus": {"type": "string", "description": "Optional filter: 'api', 'frontend', 'imports'"},
},
"required": [],
},
},
]
# ── Slim tool definitions for 32K models ─────────────────────────────────────
# 8 essential tools with 1-line descriptions (~2,800 chars vs 7,312 for full set)
SLIM_TOOLS: list[dict] = [
{"name": "read_file", "description": "Read a file. Args: path, offset (opt), limit (opt).",
"parameters": {"type": "object", "properties": {
"path": {"type": "string"}, "offset": {"type": "integer"}, "limit": {"type": "integer"}
}, "required": ["path"]}},
{"name": "write_file", "description": "Create/overwrite a file. Max ~80 lines per call.",
"parameters": {"type": "object", "properties": {
"path": {"type": "string"}, "content": {"type": "string"}
}, "required": ["path", "content"]}},
{"name": "append_file", "description": "Append to file (or create). Use after write_file for large files.",
"parameters": {"type": "object", "properties": {
"path": {"type": "string"}, "content": {"type": "string"}
}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace old_string with new_string in a file. old_string must be unique.",
"parameters": {"type": "object", "properties": {
"path": {"type": "string"}, "old_string": {"type": "string"}, "new_string": {"type": "string"}
}, "required": ["path", "old_string", "new_string"]}},
{"name": "bash", "description": "Run a shell command.",
"parameters": {"type": "object", "properties": {
"command": {"type": "string"}
}, "required": ["command"]}},
{"name": "glob_files", "description": "Find files by pattern (e.g. '**/*.py').",
"parameters": {"type": "object", "properties": {
"pattern": {"type": "string"}
}, "required": ["pattern"]}},
{"name": "grep", "description": "Search file contents by regex.",
"parameters": {"type": "object", "properties": {
"pattern": {"type": "string"}, "path": {"type": "string"}
}, "required": ["pattern"]}},
{"name": "list_directory", "description": "List files in a directory.",
"parameters": {"type": "object", "properties": {
"path": {"type": "string", "description": "Directory path (default: project root)"}
}, "required": []}},
{"name": "replace_lines", "description": "Replace a range of lines in a file by line number. Great for structural edits like wrapping a block in an if-statement, re-indenting, or replacing multi-line sections. Must read_file first.",
"parameters": {"type": "object", "properties": {
"path": {"type": "string", "description": "File path"},
"start_line": {"type": "integer", "description": "First line to replace (1-based, inclusive)"},
"end_line": {"type": "integer", "description": "Last line to replace (1-based, inclusive)"},
"new_content": {"type": "string", "description": "Replacement text (replaces all lines from start to end)"}
}, "required": ["path", "start_line", "end_line", "new_content"]}},
]
class ConvergenceTracker:
"""Detects when an agent is stuck in a read-edit loop with diminishing returns.
Tracks bytes written per turn. Signals convergence when:
- Last N turns had zero writes, OR
- Average change dropped below threshold of initial write size.
"""
def __init__(self, window: int = 5, min_change_ratio: float = 0.02):
self._window = window
self._min_change_ratio = min_change_ratio
self._turn_writes: list[int] = [] # bytes written per turn
self._initial_write: int = 0
self._current_turn_bytes: int = 0
def record_write(self, bytes_changed: int) -> None:
"""Record bytes written by a tool call in the current turn."""
self._current_turn_bytes += max(0, bytes_changed)
if self._initial_write == 0 and bytes_changed > 0:
self._initial_write = bytes_changed
def end_turn(self) -> None:
"""Flush current turn's writes to history."""
self._turn_writes.append(self._current_turn_bytes)
self._current_turn_bytes = 0
def should_stop(self) -> bool:
"""True when the agent has converged (no meaningful writes in recent turns)."""
if len(self._turn_writes) < self._window:
return False
recent = self._turn_writes[-self._window:]
# All recent turns had zero writes
if all(b == 0 for b in recent):
return True
# Average change is negligible vs initial write
if self._initial_write > 0:
avg = sum(recent) / len(recent)
if avg < self._initial_write * self._min_change_ratio:
return True
return False
def get_tools_for_model(context_window: int, has_build_context: bool = False) -> list[dict]:
"""Return appropriate tool set based on model context window size.
32K models get SLIM_TOOLS (8 essential tools, saves ~1,600 tokens/turn).
Larger models get full BUILT_IN_TOOLS — the `think` tool helps them
plan complex files (tested: removing it drops Pro from A to B).
"""
if context_window <= 32_000:
if has_build_context:
return SLIM_TOOLS + [t for t in BUILT_IN_TOOLS if t["name"] in ("claim_file", "check_context")]
return list(SLIM_TOOLS)
if has_build_context:
return list(BUILT_IN_TOOLS)
return list(BUILT_IN_TOOLS)
# ── ForgeAgent ───────────────────────────────────────────────────────────────
class ForgeAgent:
"""Core agent runtime: prompt → LLM → tool calls → execute → loop.
Replaces Claude Code's closed-source agent with ~200 lines of Python
that works with any LLM supporting function calling.
"""
def __init__(
self,
model_config: ModelConfig,
project_root: Path | str = ".",
hooks: HookSystem | None = None,
sandbox: PathSandbox | None = None,
tools: list[dict] | None = None,
max_turns: int = 25,
agent_id: str = "forge-agent",
wire_v11_hooks: bool = True,
on_event: Any = None,
streaming: bool = True,
escalation_model: str | None = None,
build_context: Any = None,
cancellation: Any = None,
soft_max_turns: int | None = None,
) -> None:
self.model_config = model_config
self.project_root = Path(project_root).resolve()
self.router = ModelRouter()
self.hooks = hooks or HookSystem()
self.sandbox = sandbox or PathSandbox(self.project_root, extra_allowed=[Path(tempfile.gettempdir())])
self.risk_classifier = RiskClassifier()
self.tools = tools if tools is not None else BUILT_IN_TOOLS
self.max_turns = max_turns
self.soft_max_turns = soft_max_turns or max_turns
self.agent_id = agent_id
self.provider = get_provider(model_config.model_id)
self._hook_state = None
self._files_read: set[str] = set()
self.on_event = on_event
self.streaming = streaming
self.escalation_model = escalation_model
self._escalated = False
self.autonomy_manager: AutonomyManager | None = None
self.build_context = build_context # BuildContext for multi-agent coordination
self._claimed_files: set[str] = set() # Tracks files already claimed (suppresses duplicate events)
self._cancellation = cancellation # BuildCancellation for cooperative Ctrl-C pause
# Readonly mode: blocks bash write patterns (set by caller for read-only tasks)
self._is_readonly: bool = False
# Circuit breaker: per-tool failure tracking
self._tool_failures: dict[str, int] = {}
self._disabled_tools: set[str] = set()
self.TOOL_CIRCUIT_THRESHOLD = 3
# Self-correction: verify own output after completion
self.auto_verify = True
self._awaiting_read_proof = False
self._self_correction_count = 0
# Acceptance criteria: bash commands that prove the code works
self._acceptance_criteria: list[str] = []
# Adaptive turn budgets (set by caller via compute_turn_budget)
self._verify_budget: int = 3
self._verify_turns_used: int = 0
self._in_verify_phase: bool = False
self._escalation_turns: int = max(8, max_turns // 2)
self._convergence: ConvergenceTracker = ConvergenceTracker()
# Auto-wire V11 hooks into the active HookSystem (provided or default)
if wire_v11_hooks:
self._wire_v11_hooks()
def _wire_v11_hooks(self) -> None:
"""Auto-wire the 12 V11 hook implementations into the HookSystem."""
try:
from forge_hooks_impl import wire_all_hooks
autonomy_file = self.project_root / ".forge" / "state" / "autonomy.json"
am = None
if autonomy_file.exists():
am = AutonomyManager(autonomy_file)
self._hook_state = wire_all_hooks(
self.hooks,
project_root=self.project_root,
autonomy_manager=am,
)
if am is not None:
self.autonomy_manager = am
logger.debug("V11 hooks auto-wired for project: %s", self.project_root.name)
except ImportError:
logger.debug("forge_hooks_impl not available — running without V11 hooks")
except Exception as exc:
logger.warning("Failed to wire V11 hooks: %s", exc)
async def run(
self,
prompt: str,
system: str = "",
context: dict[str, Any] | None = None,
) -> AgentResult:
"""Execute the agent loop.
Args:
prompt: User instruction for the agent.
system: Optional system message prepended to conversation.
context: Optional context dict injected into the first user message.
"""
messages = self._build_initial_messages(prompt, system, context)
artifacts: dict[str, Any] = {}
self._last_artifacts = artifacts # Expose for crash recovery (ref stays current)
total_tool_calls = 0
_total_in = 0
_total_out = 0
# Reset circuit breaker state for each run
self._tool_failures = {}
self._disabled_tools = set()
self.auto_verify = True
self._awaiting_read_proof = False
self._self_correction_count = 0
# Note: _acceptance_criteria is NOT reset here — it's set by the caller
# before run() and should persist across retries within the same task.
_syntax_error_files: list[str] = []
# Reset verify phase and convergence tracker for each run
self._verify_turns_used = 0
self._in_verify_phase = False
self._convergence = ConvergenceTracker()
turn = 0
hard_limit = max(self.max_turns + 4, int(self.max_turns * 1.3)) # tight safety cap
while turn < hard_limit:
# Check for cooperative cancellation (Ctrl-C pause)
if self._cancellation and self._cancellation.is_paused():
return AgentResult(
output="Build paused by user",
turns=turn,
artifacts=artifacts,
tool_calls_made=total_tool_calls,
error="paused",
model_id=self.model_config.model_id,
tokens_in=_total_in,
tokens_out=_total_out,
)
# Convergence detection: if stuck in read-only loops, disable writes
# But only after using at least 40% of turn budget — early turns need room for multi-step creation
past_early_phase = turn >= self.max_turns * 0.4
if artifacts and past_early_phase and self._convergence.should_stop():
WRITE_TOOLS = {"write_file", "append_file", "edit_file", "search_replace_all", "replace_lines"}
self._disabled_tools.update(WRITE_TOOLS)
logger.info("Convergence detected at turn %d — write tools disabled", turn)
# Verify phase budget: count ALL turns in verify phase
if self._in_verify_phase:
self._verify_turns_used += 1
if self._verify_turns_used >= self._verify_budget:
WRITE_TOOLS = {"write_file", "append_file", "edit_file", "search_replace_all", "replace_lines"}
self._disabled_tools.update(WRITE_TOOLS)
logger.info("Verify budget exhausted (%d turns) — write tools disabled",
self._verify_turns_used)
# Soft turn limit warning — emit event only (convergence detector handles wrap-up)
if turn == self.soft_max_turns - 1:
if self.on_event:
try:
self.on_event(AgentEvent(kind="turn_limit_warning", turn=turn + 1))
except Exception:
pass
# Emit turn_start event
if self.on_event:
try:
self.on_event(AgentEvent(kind="turn_start", turn=turn + 1))
except Exception:
pass
# Filter disabled tools (circuit breaker)
active_tools = [t for t in self.tools if t["name"] not in self._disabled_tools]
# Call the model with retry logic for transient errors
response = None
last_error = None
for attempt in range(MAX_API_RETRIES):
try:
if self.streaming:
response = await self.router.stream_send(
messages, active_tools, self.model_config,
on_delta=self._on_stream_delta,
)
else:
response = await self.router.send(messages, active_tools, self.model_config)
break # Success
except Exception as exc:
last_error = exc
error_str = str(exc).lower()
# Retry on transient errors
if any(code in error_str for code in ("429", "500", "502", "503", "throttl", "rate")):
logger.warning(
"Transient error (attempt %d/%d): %s",
attempt + 1, MAX_API_RETRIES, exc,
)
if self.on_event:
try:
self.on_event(AgentEvent(kind="error", error=f"Retry {attempt + 1}: {exc}"))
except Exception:
pass
if attempt < MAX_API_RETRIES - 1:
delay = min(2 ** attempt + random.uniform(0, 1), 30)
logger.warning("Retrying in %.1fs", delay)
await asyncio.sleep(delay)
continue
# Context overflow — compact and retry once
if "context" in error_str and (
"length" in error_str or "exceed" in error_str or "too long" in error_str
):
pre_tokens = self._estimate_tokens(messages)
budget = get_prompt_budget(self.model_config.context_window)
messages = self._compact_messages(messages, budget)
post_tokens = self._estimate_tokens(messages)
if post_tokens >= pre_tokens:
logger.error("Compaction did not reduce context (%d→%d) — giving up", pre_tokens, post_tokens)
break
logger.warning("Context overflow — compacted %d→%d tokens, retrying", pre_tokens, post_tokens)
continue
# Non-transient error — fail immediately
break
if response is None:
logger.error("Model call failed on turn %d after %d attempts: %s", turn, MAX_API_RETRIES, last_error)
return AgentResult(
output=f"Model error after {MAX_API_RETRIES} attempts: {last_error}",
turns=turn + 1,
artifacts=artifacts,
tool_calls_made=total_tool_calls,
error=str(last_error),
model_id=self.model_config.model_id,
tokens_in=_total_in,
tokens_out=_total_out,
)
# Track tokens
_resp_in = response.usage.get("input_tokens", 0)
_resp_out = response.usage.get("output_tokens", 0)
_total_in += _resp_in
_total_out += _resp_out
# Emit model_response event
if self.on_event:
try:
self.on_event(AgentEvent(
kind="model_response", turn=turn + 1,
tokens_in=_resp_in,
tokens_out=_resp_out,
))
except Exception:
pass
tool_calls = self.router.extract_tool_calls(response)
# Detect output truncation — model hit max_tokens mid-response
if response.stop_reason in ("max_tokens", "length") and tool_calls:
# Tool calls may have truncated content — treat as malformed
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
messages.append({"role": "user", "content": (
"Your response was TRUNCATED (hit output token limit). "
"The tool call content is incomplete. Write SHORTER content: "
"max ~80 lines per write_file call. "
"Use write_file for the first ~80 lines, then append_file for the rest."
)})
turn += 1
continue
# Clear read-proof flag when model uses tools (it's doing work)
if tool_calls and self._awaiting_read_proof:
self._awaiting_read_proof = False
# Self-correction for malformed tool calls
if tool_calls:
valid_calls = []
malformed = []
for call in tool_calls:
if not isinstance(call.args, dict):
malformed.append(call)
elif "_raw" in call.args or "_truncated" in call.args:
malformed.append(call)
else:
valid_calls.append(call)
if malformed and not valid_calls:
# All calls malformed — inject error and let model retry
has_truncated = any("_truncated" in (c.args if isinstance(c.args, dict) else {}) or "_raw" in (c.args if isinstance(c.args, dict) else {}) for c in malformed)
if has_truncated:
error_msg = (
"Tool call truncated — output hit token limit. "
"Write SHORTER content: max ~80 lines per write_file. "
"Use write_file for first 80 lines, then append_file for the rest. "
)
else:
error_msg = "Your tool calls had invalid arguments. Please retry with valid JSON. Errors: "
for mc in malformed:
error_msg += f"{mc.name}(args={mc.args!r}) — args must be a JSON object. "
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
messages.append({"role": "user", "content": error_msg})
turn += 1
continue # Let the model retry this turn
tool_calls = valid_calls # Use only valid calls
# No tool calls → agent is done (or needs verify phase)
if not tool_calls:
self._convergence.end_turn() # Record zero-write turn
# Don't enter verify too early — wait until at least 40% of turn budget used
# This prevents premature verify after a single write_file in multi-write tasks
past_verify_threshold = turn >= self.max_turns * 0.4
# Early stop nudge: if model stops calling tools but hasn't used enough turns,
# remind it to keep writing (the file is likely incomplete)
if (not past_verify_threshold
and total_tool_calls > 0
and artifacts
and not self._in_verify_phase
and response.text and response.text.strip()):
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
messages.append({"role": "user", "content": (
"The file may be incomplete. If the file needs more content, "
"use append_file to add the remaining sections. "
"If the file is truly complete, say 'done'."
)})
turn += 1
continue
# Enter verify phase: first time model says "done" with artifacts
if (self.auto_verify
and not self._in_verify_phase
and past_verify_threshold
and total_tool_calls > 0
and artifacts
and response.text and response.text.strip()):
self._in_verify_phase = True
self._self_correction_count += 1
# Include acceptance criteria if available
acceptance_block = ""
if self._acceptance_criteria:
criteria_lines = "\n".join(f" {i+1}. {c}" for i, c in enumerate(self._acceptance_criteria))
acceptance_block = (
"\n\nAfter reading back files, run these acceptance tests via bash to verify "
"the code actually works (not just compiles):\n"
f"{criteria_lines}\n"
"If any test fails, read the relevant code, fix the bug, and re-run the test. "
"Do NOT skip failing tests — fix them."
)
verify_prompt = (
"Before finishing, read back the files you created or modified. "
"Check: syntax correctness, imports match exports, no TODO/stub placeholders. "
"If you find issues, fix them now. If everything looks correct, "
"confirm by saying 'Verified — all files are correct.'"
f"{acceptance_block}"
)
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
messages.append({"role": "user", "content": verify_prompt})
self.auto_verify = False # Don't re-enter
self._awaiting_read_proof = True
turn += 1
continue
# Single nudge: model said "verified" without reading any files
if (self._awaiting_read_proof
and self._self_correction_count < 2
and artifacts):
self._awaiting_read_proof = False
self._self_correction_count += 1
file_list = ", ".join(
p.rsplit("/", 1)[-1] for p in list(artifacts.keys())[:6]
)
force_read = (
f"You said files are correct but did NOT call read_file to verify. "
f"You MUST call read_file on each file you created: {file_list}. "
f"Read them now and check for syntax errors, wrong comment styles "
f"(e.g. // in Python files), and incomplete code."
)
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
messages.append({"role": "user", "content": force_read})
turn += 1
continue
# Agent is done — clear flags and return
self._awaiting_read_proof = False
await self.hooks.on_stop(project=self.project_root.name)
return AgentResult(
output=response.text,
turns=turn + 1,
artifacts=artifacts,
tool_calls_made=total_tool_calls,
model_id=self.model_config.model_id,
tokens_in=_total_in,
tokens_out=_total_out,
self_corrections=self._self_correction_count,
)
# Append assistant message to history
adapter = self.router.route(self.model_config.model_id)
messages.append(adapter.format_assistant_message(response))
# Execute each tool call
for call in tool_calls:
# Check for cooperative cancellation between tool calls
if self._cancellation and self._cancellation.is_paused():
return AgentResult(
output="Build paused by user (mid-turn)",
turns=turn + 1,
artifacts=artifacts,
tool_calls_made=total_tool_calls,
error="paused",
model_id=self.model_config.model_id,
tokens_in=_total_in,
tokens_out=_total_out,
)
total_tool_calls += 1
# Emit tool_start event
_tool_file = call.args.get("path", call.args.get("file_path", call.args.get("pattern", ""))) if isinstance(call.args, dict) else ""
if self.on_event:
try:
self.on_event(AgentEvent(
kind="tool_start", turn=turn + 1,
tool_name=call.name,
tool_args=call.args if isinstance(call.args, dict) else {},
file_path=str(_tool_file),
))
except Exception:
pass
_tool_t0 = time.monotonic()
result_str = await self._execute_tool_call(call, artifacts)
# Tool retry on failure: retry once after 1s delay
_is_error = result_str and ("ERROR" in result_str[:80] or "BLOCKED" in result_str[:80])
if _is_error and call.name not in self._disabled_tools:
self._tool_failures[call.name] = self._tool_failures.get(call.name, 0) + 1
if self._tool_failures[call.name] < self.TOOL_CIRCUIT_THRESHOLD:
# Retry once
await asyncio.sleep(1)
retry_str = await self._execute_tool_call(call, artifacts)
if retry_str and "ERROR" not in retry_str[:80]:
result_str = retry_str # Retry succeeded
self._tool_failures[call.name] = max(0, self._tool_failures[call.name] - 1)
# Circuit breaker: disable tool after threshold failures
if self._tool_failures.get(call.name, 0) >= self.TOOL_CIRCUIT_THRESHOLD:
self._disabled_tools.add(call.name)
result_str += f"\n\nTool '{call.name}' disabled due to repeated failures."
logger.warning("Circuit breaker: tool '%s' disabled after %d failures",
call.name, self.TOOL_CIRCUIT_THRESHOLD)
_tool_dur = int((time.monotonic() - _tool_t0) * 1000)
# Determine file action
_fa = {"read_file": "read", "write_file": "write", "append_file": "append",
"edit_file": "edit", "bash": "run", "glob_files": "search",
"grep": "search", "search_replace_all": "edit",
"replace_lines": "edit"}.get(call.name, "")
_tool_err = ""
if result_str and ("ERROR" in result_str[:80] or "BLOCKED" in result_str[:80]):
_tool_err = result_str[:200]
# Emit tool_end event
if self.on_event:
try:
self.on_event(AgentEvent(
kind="tool_end", turn=turn + 1,
tool_name=call.name,
file_path=str(_tool_file),
file_action=_fa,
duration_ms=_tool_dur,
error=_tool_err,
))
except Exception:
pass
messages.append(
adapter.format_tool_result(call.id, result_str)
)
# Track writes for convergence detection
if call.name in ("write_file", "append_file", "edit_file", "search_replace_all", "replace_lines"):
_content = call.args.get("content", call.args.get("new_string", "")) if isinstance(call.args, dict) else ""
self._convergence.record_write(len(_content))
# Track syntax errors for post-turn fix injection
if call.name in ("write_file", "append_file", "edit_file", "replace_lines"):
if result_str and ("Syntax issue" in result_str or "HTML ERROR" in result_str or "CSS ERROR" in result_str):
_file = call.args.get("path", "unknown") if isinstance(call.args, dict) else "unknown"
_syntax_error_files.append(str(_file))
# End convergence tracking turn (records zero if no writes)
self._convergence.end_turn()
# RC1: If syntax errors were detected this turn, force the agent to fix them
if _syntax_error_files:
_deduped = list(dict.fromkeys(_syntax_error_files))
fix_msg = (
f"SYNTAX ERROR detected in: {', '.join(_deduped)}. "
f"You MUST fix these errors NOW. Read each file with read_file, "
f"find the syntax error, and fix it with edit_file. "
f"Do NOT proceed until all syntax errors are resolved."
)
messages.append({"role": "user", "content": fix_msg})
_syntax_error_files.clear()
turn += 1
continue # Force another turn to fix
# Context compaction — threshold from budget (60% for 32K, 75% for 200K, 80% for 1M+)
estimated_tokens = self._estimate_tokens(messages)
budget = get_prompt_budget(self.model_config.context_window)
if estimated_tokens > self.model_config.context_window * budget["compaction_threshold"]:
messages = self._compact_messages(messages, budget)
logger.info(
"Compacted context: %d tokens → %d tokens",
estimated_tokens,
self._estimate_tokens(messages),
)
turn += 1
# Escalation: if hard limit hit and we have an escalation target, retry with smarter model
if self.escalation_model and not self._escalated:
self._escalated = True
new_config = get_model_config(self.escalation_model, max_tokens=self.model_config.max_tokens)
if self.on_event:
try:
self.on_event(AgentEvent(
kind="model_escalation",
error=f"Escalating: {self.model_config.short_name} -> {new_config.short_name}",
))
except Exception:
pass
old_config = self.model_config
old_provider = self.provider
old_max = self.max_turns
old_soft = self.soft_max_turns
old_verify = self._verify_budget
self.model_config = new_config
self.provider = get_provider(new_config.model_id)
# Reduced budget for escalation — not a fresh full run
self.max_turns = self._escalation_turns
self.soft_max_turns = self._escalation_turns
self._verify_budget = max(1, self._escalation_turns // 5)
# Gather artifacts summary so escalated model knows about prior work
artifact_summary = ""
if artifacts:
artifact_lines = [f"- {fpath} ({info.get('action', 'written') if isinstance(info, dict) else 'written'})"
for fpath, info in list(artifacts.items())[:15]]
artifact_summary = (
"\n\n## IMPORTANT: Prior Work Already Completed\n"
"These files have ALREADY been written to disk by a previous model attempt. "
"Read them before modifying — do NOT overwrite correct work.\n"
+ "\n".join(artifact_lines) + "\n"
"Start by reading the existing files to understand what was done, "
"then complete any remaining work."
)
try:
escalated_result = await self.run(prompt=prompt + artifact_summary, system=system, context=context)
escalated_result.escalated = True
escalated_result.tokens_in += _total_in
escalated_result.tokens_out += _total_out
# Merge original artifacts into escalation result
escalated_result.artifacts = {**artifacts, **escalated_result.artifacts}
return escalated_result
finally:
self.model_config = old_config
self.provider = old_provider
self.max_turns = old_max