diff --git a/INTERVIEW.md b/INTERVIEW.md
index b9efcef..e69de29 100644
--- a/INTERVIEW.md
+++ b/INTERVIEW.md
@@ -1,26 +0,0 @@
-# TODO 7: Multiline grouping approach (excluded from LLM body, protected from patches)
-
-> What exact on-disk syntax do you want for the multiline grouping section? Please provide a concrete before/after example (including where it sits relative to `---` front matter and the `# -- SCRATCHPAD` heading).
-
-after the front matter, before the scratchpad heading. it should be before any other content in the file after the front matter
-
-> How should the end of the multiline grouping section be detected (e.g., first blank line, next heading, a closing marker, end-of-file)? Can the grouping text itself contain blank lines?
-
-use explicit opening and closing marker syntax. if markdown frontmatter satisfies the criteria e.g. support for multiline content, use that. otherwise implement this in the way which makes the most sense / is the most aligned w/ good practice.
-
-> For legacy documents that still use a single-line `Grouping approach: ...` prefix, should the tool leave that line as-is, or migrate it to the new multiline format when writing the file? If migration is desired, should it happen only when `--grouping` is provided / user is prompted, or always?
-
-leave as is
-
-> The current CLI prompt uses `input()` (single line). How do you want multiline grouping input to be entered (e.g., read until a lone `.` line, read until EOF, open $EDITOR, allow literal `\n` escapes, etc.)?
-
-yes figure out best/most simple but also useable way to support multiline input
-
-> Should the grouping section be preserved verbatim (whitespace/indentation), or normalized (trim lines, collapse spaces) before inserting into the prompt’s “Maintain the grouping approach: …” line?
-
-preserved verbatim
-
-> Do you want the grouping section to be strictly immutable during patch application (i.e., patches only apply to the body after removing the grouping block), or should we also detect and error if a patch’s SEARCH text matches inside the grouping section?
-
-strictly immutable. it should be as if it didn't exist in the body at all. so if a search block matches it and nothing else, it results in an error > retry. there should not need to be any special handling logic for these cases. it simply isn't part of the document body for the purposes of search/replace or substitute blocks.
-
diff --git a/TODO.md b/TODO.md
index 3d8fa5b..e2b2fbd 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,7 +1,7 @@
- ask model to provide snippets from integrate text for each find/replace, to clarify what it intended it to integrate
- on fail, only ask the model to provide just that single failed block instead of asking it to provide all blocks again. this is only possible once the model returns what integrated text each block relates
- only ask for start and end lines of search block instead of exact text. if there are multiple matches, ask model to provide a sufficient number of of lines at start or end to narrow down options to a single match. ensure that the search block which the verification prompt sees is not affected by this, by populating the SEARCH section of the block given in the verification prompt with the matching text from the file, instead of only including the start and end lines provided by the model
-- modify so that it uses tool calling + hierarchical markdown parsing to avoid needing to ever send the entire document to the model, and instead allow the model to find the relevant section(s) (could often be more than one section which should be modified even to integrate a single piece of info) to modify, and then once it has found the sections, it provides the search/replace diffs
- - ensure that the model uses arbitrarily nested md headers, to make this approach scalable instead of only e.g. using one level of headings
-- move logs outside of src/
-- put group strat into front matter
\ No newline at end of file
+
+
+
+- make sure prompts mention importance of
\ No newline at end of file
diff --git a/src/SPEC.md b/src/SPEC.md
new file mode 100644
index 0000000..e69de29
diff --git a/src/integrate_notes.py b/src/integrate_notes.py
index 41f758f..398bd9f 100644
--- a/src/integrate_notes.py
+++ b/src/integrate_notes.py
@@ -201,7 +201,10 @@ def extract_grouping_section(body: str) -> tuple[GroupingSection | None, str]:
while grouping_index < len(lines) and not lines[grouping_index].strip():
grouping_index += 1
- if grouping_index < len(lines) and lines[grouping_index].strip() == GROUPING_BLOCK_START:
+ if (
+ grouping_index < len(lines)
+ and lines[grouping_index].strip() == GROUPING_BLOCK_START
+ ):
end_index = grouping_index + 1
while end_index < len(lines) and lines[end_index].strip() != GROUPING_BLOCK_END:
end_index += 1
@@ -242,7 +245,9 @@ def _format_grouping_block(grouping_text: str) -> str:
def render_grouping_section(
- grouping_text: str, existing_section: GroupingSection | None, preserve_existing: bool
+ grouping_text: str,
+ existing_section: GroupingSection | None,
+ preserve_existing: bool,
) -> str:
if not grouping_text.strip():
raise ValueError("Grouping approach cannot be empty.")
@@ -287,7 +292,7 @@ def prompt_for_grouping() -> str:
f"{GROUPING_PREFIX} at the top of the document.\n"
"Enter multiline text and finish with a single line containing only a '.'.\n"
"Examples:\n"
- "- Grouping approach: Group points according to what problem each idea/proposal/mechanism/concept addresses/are trying to solve, which you will need to figure out yourself based on context. Do not combine multiple goals/problems into one group. Keep goals/problems specific. Ensure groups are mutually exclusive and collectively exhaustive. Avoid overlap between group's goals/problems. sub-headings should be per-mechanism/per-solution i.e. according to which \"idea\"/solution each point relates to.\n"
+ '- Grouping approach: Group points according to what problem each idea/proposal/mechanism/concept addresses/are trying to solve, which you will need to figure out yourself based on context. Do not combine multiple goals/problems into one group. Keep goals/problems specific. Ensure groups are mutually exclusive and collectively exhaustive. Avoid overlap between group\'s goals/problems. sub-headings should be per-mechanism/per-solution i.e. according to which "idea"/solution each point relates to.\n'
"- Group points according to what you think the most useful/interesting/relevant groupings are. Ensure similar, related and contradictory points are adjacent.\n"
"Your input:\n"
)
@@ -1391,7 +1396,9 @@ def integrate_notes(
source_body, source_scratchpad = split_document_sections(source_content)
grouping_section, working_body = extract_grouping_section(source_body)
- resolved_grouping = grouping or (grouping_section.text if grouping_section else None)
+ resolved_grouping = grouping or (
+ grouping_section.text if grouping_section else None
+ )
if not resolved_grouping:
resolved_grouping = prompt_for_grouping()
logger.info("Recorded new grouping approach from user input.")
diff --git a/src/integrate_notes_spec.py b/src/integrate_notes_spec.py
new file mode 100644
index 0000000..0bbebc7
--- /dev/null
+++ b/src/integrate_notes_spec.py
@@ -0,0 +1,287 @@
+from __future__ import annotations
+
+import argparse
+import random
+import sys
+from pathlib import Path
+from time import perf_counter
+
+from loguru import logger
+
+from spec_chunking import request_chunk_groups
+from spec_config import (
+ SCRATCHPAD_HEADING,
+ SpecConfig,
+ default_log_path,
+ load_config,
+ repo_root,
+)
+from spec_editing import request_and_apply_edits
+from spec_exploration import explore_until_checkout
+from spec_llm import create_openai_client
+from spec_logging import configure_logging
+from spec_markdown import (
+ build_document,
+ format_duration,
+ normalize_paragraphs,
+ split_document_sections,
+)
+from spec_notes import NoteRepository
+from spec_summary import SummaryService
+from spec_verification import VerificationManager, build_verification_prompt
+
+
+def parse_arguments() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ description="Integrate scratchpad notes into a markdown repository (SPEC flow)."
+ )
+ parser.add_argument(
+ "--source", required=False, help="Path to the root markdown document."
+ )
+ parser.add_argument(
+ "--disable-verification",
+ action="store_true",
+ help="Disable verification prompts and background verification checks.",
+ )
+ return parser.parse_args()
+
+
+def resolve_source_path(provided_path: str | None) -> Path:
+ if provided_path:
+ path = Path(provided_path).expanduser().resolve()
+ else:
+ user_input = input("Enter path to the root markdown document: ").strip()
+ if not user_input:
+ raise ValueError("Document path is required to proceed.")
+ path = Path(user_input).expanduser().resolve()
+ if not path.exists():
+ raise FileNotFoundError(f"Source document not found at {path}.")
+ if not path.is_file():
+ raise ValueError(f"Source path {path} is not a file.")
+ return path
+
+
+def _select_sample_filenames(
+ repo: NoteRepository, reachable: list[Path], config: SpecConfig
+) -> list[str]:
+ candidates = []
+ for path in reachable:
+ if repo.is_index_note(path, config.index_filename_suffix):
+ continue
+ if repo.get_word_count(path) < config.granularity_sample_min_words:
+ continue
+ candidates.append(path.name)
+
+ if not candidates:
+ return []
+
+ if len(candidates) <= config.granularity_sample_size:
+ return candidates
+
+ return random.sample(candidates, config.granularity_sample_size)
+
+
+def _ensure_scratchpad_matches(
+ source_path: Path, expected_paragraphs: list[str]
+) -> tuple[str, list[str]]:
+ content = source_path.read_text(encoding="utf-8")
+ body, scratchpad = split_document_sections(content, SCRATCHPAD_HEADING)
+ paragraphs = normalize_paragraphs(scratchpad)
+ if paragraphs != expected_paragraphs:
+ raise RuntimeError(
+ "Scratchpad changed while integration was running; aborting to avoid data loss."
+ )
+ return body, paragraphs
+
+
+def _write_updated_files(
+ source_path: Path,
+ root_body: str,
+ remaining_paragraphs: list[str],
+ updated_files: dict[Path, str],
+ repo: NoteRepository,
+ summaries: SummaryService,
+) -> None:
+ for path, content in updated_files.items():
+ if path == source_path:
+ root_body = content
+ else:
+ path.write_text(content, encoding="utf-8")
+ repo.invalidate_content(path)
+ summaries.invalidate(path)
+
+ document = build_document(root_body, SCRATCHPAD_HEADING, remaining_paragraphs)
+ source_path.write_text(document, encoding="utf-8")
+ repo.set_root_body(root_body)
+
+
+def integrate_notes_spec(source_path: Path, disable_verification: bool) -> Path:
+ config = load_config(repo_root() / "config.json")
+ source_content = source_path.read_text(encoding="utf-8")
+ source_body, source_scratchpad = split_document_sections(
+ source_content, SCRATCHPAD_HEADING
+ )
+ scratchpad_paragraphs = normalize_paragraphs(source_scratchpad)
+
+ repo = NoteRepository(source_path, source_body, source_path.parent)
+ client = create_openai_client()
+ summaries = SummaryService(repo, client, config)
+ verification_manager = (
+ None if disable_verification else VerificationManager(client, source_path)
+ )
+
+ try:
+ if not scratchpad_paragraphs:
+ logger.info(
+ "No scratchpad notes to integrate; ensuring scratchpad heading remains present."
+ )
+ source_path.write_text(
+ build_document(source_body, SCRATCHPAD_HEADING, []),
+ encoding="utf-8",
+ )
+ return source_path
+
+ reachable = repo.iter_reachable_paths()
+ sample_filenames = _select_sample_filenames(repo, reachable, config)
+ chunk_groups = request_chunk_groups(
+ client, scratchpad_paragraphs, sample_filenames, config
+ )
+
+ remaining_indices = set(range(1, len(scratchpad_paragraphs) + 1))
+ total_chunks = len(chunk_groups)
+ chunks_completed = 0
+ integration_start = perf_counter()
+ current_body = source_body
+
+ for group in chunk_groups:
+ if any(index not in remaining_indices for index in group):
+ raise RuntimeError(
+ "Chunk references paragraphs that were already integrated; aborting."
+ )
+ chunk_paragraphs = [scratchpad_paragraphs[index - 1] for index in group]
+ chunk_text = "\n\n".join(chunk_paragraphs)
+
+ expected_remaining = [
+ scratchpad_paragraphs[index - 1]
+ for index in sorted(remaining_indices)
+ ]
+ file_body, _ = _ensure_scratchpad_matches(
+ source_path, expected_remaining
+ )
+ if file_body != current_body:
+ raise RuntimeError(
+ "Root document body changed while integration was running; aborting."
+ )
+
+ repo.set_root_body(current_body)
+ reachable = repo.iter_reachable_paths()
+ summary_map = summaries.get_summaries(reachable)
+
+ root_summary = summary_map[source_path]
+ root_headings = repo.get_headings(source_path)
+ root_links = repo.get_links(source_path)
+ root_link_summaries = [
+ (path, summary_map[path])
+ for path in root_links
+ if path in summary_map
+ ]
+
+ chunk_label = f"chunk {chunks_completed + 1}/{total_chunks}"
+ checkout_paths = explore_until_checkout(
+ client,
+ chunk_text,
+ source_path,
+ root_summary,
+ root_headings,
+ root_links,
+ root_link_summaries,
+ summary_map,
+ repo,
+ config,
+ )
+
+ checked_out_contents = {
+ path: repo.get_note_content(path) for path in checkout_paths
+ }
+ edit_application = request_and_apply_edits(
+ client,
+ chunk_text,
+ checked_out_contents,
+ checkout_paths,
+ chunk_label,
+ )
+
+ for path, content in edit_application.updated_contents.items():
+ if path == source_path:
+ current_body = content
+ for path in edit_application.updated_contents:
+ if path != source_path:
+ repo.invalidate_content(path)
+
+ for index in group:
+ remaining_indices.remove(index)
+ remaining_paragraphs = [
+ scratchpad_paragraphs[index - 1]
+ for index in sorted(remaining_indices)
+ ]
+
+ _write_updated_files(
+ source_path,
+ current_body,
+ remaining_paragraphs,
+ edit_application.updated_contents,
+ repo,
+ summaries,
+ )
+
+ if verification_manager is not None:
+ verification_prompt = build_verification_prompt(
+ chunk_text,
+ edit_application.patch_replacements,
+ edit_application.duplicate_texts,
+ )
+ verification_manager.enqueue_prompt(
+ verification_prompt,
+ chunk_label,
+ chunks_completed,
+ total_chunks,
+ )
+
+ chunks_completed += 1
+ remaining_chunks = total_chunks - chunks_completed
+ if remaining_chunks > 0:
+ elapsed_seconds = perf_counter() - integration_start
+ average_duration = elapsed_seconds / chunks_completed
+ estimated_seconds_remaining = average_duration * remaining_chunks
+ logger.info(
+ f"Estimated time remaining: {format_duration(estimated_seconds_remaining)}"
+ f" for {remaining_chunks} remaining chunk(s)."
+ )
+
+ logger.info("All scratchpad notes integrated; scratchpad section cleared.")
+ return source_path
+ finally:
+ summaries.shutdown()
+ if verification_manager is not None:
+ verification_manager.shutdown()
+
+
+def main() -> None:
+ configure_logging(default_log_path())
+ try:
+ args = parse_arguments()
+ source_path = resolve_source_path(args.source)
+ integrated_path = integrate_notes_spec(
+ source_path,
+ args.disable_verification,
+ )
+ logger.info(
+ f"Integration completed. Updated document available at {integrated_path}."
+ )
+ except Exception as error:
+ logger.exception(f"Integration failed: {error}")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/spec_chunking.py b/src/spec_chunking.py
new file mode 100644
index 0000000..7d0a972
--- /dev/null
+++ b/src/spec_chunking.py
@@ -0,0 +1,120 @@
+from __future__ import annotations
+
+import json
+from typing import List
+
+from loguru import logger
+
+from spec_config import MAX_CHUNKING_ATTEMPTS, SpecConfig
+from spec_llm import request_text
+from spec_markdown import count_words
+
+
+def build_chunking_prompt(
+ numbered_paragraphs: List[str], sample_filenames: List[str], config: SpecConfig
+) -> str:
+ paragraphs_block = "\n".join(numbered_paragraphs)
+ samples_block = "\n".join(f"- {name}" for name in sample_filenames)
+
+ instructions = (
+ "Group the numbered paragraphs into semantically coherent chunks. "
+ "Paragraphs in a chunk need not be contiguous. "
+ "Do not split a paragraph. "
+ f"Each chunk must be at most {config.max_chunk_words} words. "
+ "Return JSON only in the form: {\"groups\": [[1,2],[3]]}. "
+ "Include every paragraph number exactly once. "
+ "The order of groups should reflect the order you want them processed; do not sort."
+ )
+
+ return (
+ "\n"
+ f"{instructions}\n"
+ "\n\n"
+ "\n"
+ f"{paragraphs_block}\n"
+ "\n\n"
+ "\n"
+ f"{samples_block}\n"
+ ""
+ )
+
+
+def _parse_group_payload(payload: str, total_paragraphs: int) -> List[List[int]]:
+ data = json.loads(payload)
+ if not isinstance(data, dict) or "groups" not in data:
+ raise ValueError("Chunking response must be a JSON object with a 'groups' key.")
+ groups = data["groups"]
+ if not isinstance(groups, list) or not groups:
+ raise ValueError("Chunking response 'groups' must be a non-empty list.")
+
+ seen: set[int] = set()
+ parsed_groups: List[List[int]] = []
+
+ for group in groups:
+ if not isinstance(group, list) or not group:
+ raise ValueError("Each chunk group must be a non-empty list of integers.")
+ parsed_group: List[int] = []
+ for value in group:
+ if not isinstance(value, int):
+ raise ValueError("Chunk group entries must be integers.")
+ if value < 1 or value > total_paragraphs:
+ raise ValueError(
+ f"Paragraph number {value} is out of range 1..{total_paragraphs}."
+ )
+ if value in seen:
+ raise ValueError(f"Paragraph number {value} appears in multiple groups.")
+ seen.add(value)
+ parsed_group.append(value)
+ parsed_groups.append(parsed_group)
+
+ if len(seen) != total_paragraphs:
+ missing = [str(i) for i in range(1, total_paragraphs + 1) if i not in seen]
+ raise ValueError(f"Chunking response missing paragraphs: {', '.join(missing)}")
+
+ return parsed_groups
+
+
+def request_chunk_groups(
+ client,
+ paragraphs: List[str],
+ sample_filenames: List[str],
+ config: SpecConfig,
+) -> List[List[int]]:
+ numbered_paragraphs = [f"{index + 1}) {text}" for index, text in enumerate(paragraphs)]
+ feedback: str | None = None
+
+ for attempt in range(1, MAX_CHUNKING_ATTEMPTS + 1):
+ prompt = build_chunking_prompt(numbered_paragraphs, sample_filenames, config)
+ if feedback:
+ prompt += (
+ "\n\n\n"
+ f"{feedback}\n"
+ ""
+ )
+ response_text = request_text(client, prompt, f"chunking attempt {attempt}")
+ try:
+ groups = _parse_group_payload(response_text, len(paragraphs))
+ except Exception as error: # noqa: BLE001
+ feedback = f"Parsing error: {error}"
+ logger.warning(f"Chunking response invalid on attempt {attempt}: {error}")
+ continue
+
+ invalid_group = None
+ for group in groups:
+ words = sum(count_words(paragraphs[index - 1]) for index in group)
+ if words > config.max_chunk_words:
+ invalid_group = (group, words)
+ break
+ if invalid_group:
+ group, words = invalid_group
+ feedback = (
+ f"Chunk {group} has {words} words, exceeding max {config.max_chunk_words}."
+ )
+ logger.warning(
+ f"Chunking response exceeded word limit on attempt {attempt}: {feedback}"
+ )
+ continue
+
+ return groups
+
+ raise RuntimeError("Unable to obtain valid chunk grouping from the model.")
diff --git a/src/spec_config.py b/src/spec_config.py
new file mode 100644
index 0000000..b9af278
--- /dev/null
+++ b/src/spec_config.py
@@ -0,0 +1,82 @@
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+SCRATCHPAD_HEADING = "# -- SCRATCHPAD"
+ENV_API_KEY = "OPENAI_API_KEY"
+
+DEFAULT_MODEL = "gpt-5.2"
+DEFAULT_REASONING = {"effort": "medium"}
+
+DEFAULT_MAX_RETRIES = 3
+RETRY_INITIAL_DELAY_SECONDS = 2.0
+RETRY_BACKOFF_FACTOR = 2.0
+
+MAX_PATCH_ATTEMPTS = 3
+MAX_TOOL_ATTEMPTS = 3
+MAX_CHUNKING_ATTEMPTS = 3
+
+MAX_CONCURRENT_VERIFICATIONS = 4
+
+LOG_FILE_ROTATION_BYTES = 2 * 1024 * 1024
+
+
+@dataclass(frozen=True)
+class SpecConfig:
+ max_exploration_rounds: int = 3
+ max_files_viewed_per_round: int = 4
+ max_files_viewed_total: int = 15
+ max_files_checked_out: int = 3
+ max_chunk_words: int = 600
+ granularity_sample_size: int = 15
+ granularity_sample_min_words: int = 300
+ summary_target_words_min: int = 75
+ summary_target_words_max: int = 100
+ index_filename_suffix: str = "index.md"
+
+
+def repo_root() -> Path:
+ return Path(__file__).resolve().parent.parent
+
+
+def default_log_path() -> Path:
+ return repo_root() / "logs" / "integrate_notes.log"
+
+
+def default_pending_prompts_path() -> Path:
+ return repo_root() / "logs" / "pending_verification_prompts.json"
+
+
+def default_summary_cache_path() -> Path:
+ return Path.home() / ".cache" / "integrate_notes" / "summary_cache.json"
+
+
+def load_config(config_path: Path) -> SpecConfig:
+ if not config_path.exists():
+ return SpecConfig()
+
+ raw = config_path.read_text(encoding="utf-8")
+ if not raw.strip():
+ return SpecConfig()
+
+ data = json.loads(raw)
+ if not isinstance(data, dict):
+ raise ValueError("config.json must contain a JSON object.")
+
+ defaults = SpecConfig()
+ overrides: dict[str, Any] = {}
+ for field_name in defaults.__dataclass_fields__:
+ if field_name not in data:
+ continue
+ value = data[field_name]
+ expected_value = getattr(defaults, field_name)
+ if not isinstance(value, type(expected_value)):
+ raise ValueError(
+ f"config.json field '{field_name}' must be {type(expected_value).__name__}."
+ )
+ overrides[field_name] = value
+
+ return SpecConfig(**{**defaults.__dict__, **overrides})
diff --git a/src/spec_editing.py b/src/spec_editing.py
new file mode 100644
index 0000000..423f7c3
--- /dev/null
+++ b/src/spec_editing.py
@@ -0,0 +1,433 @@
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Dict, Iterable, List
+
+from loguru import logger
+
+from spec_config import MAX_PATCH_ATTEMPTS
+from spec_llm import parse_tool_call_arguments, request_tool_call
+
+
+EDIT_TOOL_SCHEMA = {
+ "type": "function",
+ "name": "edit_notes",
+ "description": "Provide find/replace edits for checked-out files.",
+ "strict": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["edit"]},
+ "edits": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "file": {"type": "string"},
+ "find": {"type": "string"},
+ "replace": {"type": "string"},
+ "is_duplicate": {"type": "boolean"},
+ },
+ "required": ["file", "find", "is_duplicate"],
+ },
+ },
+ },
+ "required": ["action", "edits"],
+ },
+}
+
+INSTRUCTIONS_PROMPT = """# Instructions
+
+- Integrate the provided notes into the checked-out files.
+- Ensure related points are adjacent.
+- Break content into relatively atomic bullet points; each bullet should express one idea.
+- Use nested bullets when a point is naturally a sub-point of another.
+- Make minor grammar edits as needed so ideas read cleanly as bullet points.
+- If text to integrate is already well-formatted, punctuated, grammatical and bullet-pointed, avoid altering its wording while integrating/inserting it.
+- De-duplicate overlapping points without losing any nuance or detail.
+- Keep wording succinct and remove filler words (e.g., "you know", "basically", "essentially", "uh").
+- Add new headings, sub-headings, or parent bullet points for new items, and reuse existing ones where appropriate.
+- Refactor existing content as needed to smoothly integrate the new notes.
+
+
+# Rules
+
+- PRESERVE/DO NOT LEAVE OUT ANY NUANCE, DETAILS, POINTS, CONCLUSIONS, IDEAS, ARGUMENTS, OR QUALIFICATIONS from the notes.
+- PRESERVE ALL EXPLANATIONS FROM THE NOTES.
+- Do not materially alter meaning.
+- If new items do not match existing items in the checked-out files, add them appropriately.
+- Preserve questions as questions; do not convert them into statements.
+- Do not guess acronym expansions if they are not specified.
+- Do not modify tone (e.g., confidence/certainty) or add hedging.
+- Do not omit any wikilinks, URLs, diagrams, ASCII art, mathematics, tables, figures, or other non-text content.
+- Move each link/URL/etc. to the section where it is most relevant based on its surrounding context and its URL text.
+ - Do not move links to a separate "resources" or "links" section.
+- Do not modify any wikilinks or URLs.
+
+
+# Formatting
+
+- Use multiple levels of markdown headings ("#", "##", "###", "####", etc.) to express hierarchy, not just top-level headings
+- Use "- " as the bullet prefix (not "* ", "- ", or anything else).
+ - Use four spaces for each level of bullet-point nesting.
+
+
+# Before finishing: check your work
+
+- Confirm every item from the provided notes is now represented in the checked-out files without loss of detail.
+- Ensure nothing from the original checked-out files was lost.
+- If anything is missing, integrate it in appropriately.
+"""
+
+
+@dataclass(frozen=True)
+class EditInstruction:
+ file_path: Path
+ find_text: str
+ replace_text: str | None
+ is_duplicate: bool
+
+
+@dataclass(frozen=True)
+class EditFailure:
+ index: int
+ file_path: Path
+ find_text: str
+ reason: str
+
+
+@dataclass(frozen=True)
+class EditApplication:
+ updated_contents: Dict[Path, str]
+ patch_replacements: List[str]
+ duplicate_texts: List[str]
+
+
+class EditParseError(RuntimeError):
+ pass
+
+
+def build_edit_prompt(
+ chunk_text: str,
+ checked_out_contents: Dict[Path, str],
+ failed_edits: List[EditFailure] | None = None,
+ failed_formatting: str | None = None,
+ previous_response: str | None = None,
+) -> str:
+ file_sections = []
+ for path, content in checked_out_contents.items():
+ file_sections.append(f"## [{path.name}]\n\n{content}")
+
+ instructions = (
+ "You are integrating the notes chunk into the checked-out files. "
+ "Return only a tool call to edit_notes with edits targeting the listed files. "
+ "Use is_duplicate=true only when the notes are already fully covered by existing text. "
+ "For edits, 'find' must be a single contiguous span copied from the file content. "
+ "For insertions, include the anchor text in both find and replace. "
+ "Do not include any commentary or additional text."
+ )
+
+ prompt = (
+ "\n"
+ f"{instructions}\n\n{INSTRUCTIONS_PROMPT.strip()}\n"
+ "\n\n"
+ "\n"
+ f"{chunk_text}\n"
+ "\n\n"
+ "\n"
+ f"{'\n\n'.join(file_sections)}\n"
+ ""
+ )
+
+ if failed_formatting or failed_edits:
+ feedback_lines: List[str] = []
+ if failed_formatting:
+ feedback_lines.append(
+ "The previous response could not be parsed. Fix the issues below and re-emit a valid tool call."
+ )
+ feedback_lines.append(f"Error: {failed_formatting}")
+ if failed_edits:
+ feedback_lines.append(
+ "The previous edits failed to match the current file contents. Adjust only the failing edits."
+ )
+ for failure in failed_edits:
+ feedback_lines.append(
+ f"Edit {failure.index} ({failure.file_path.name}) find text must match exactly once."
+ )
+ feedback_lines.append(failure.find_text)
+ feedback_lines.append(f"Reason: {failure.reason}")
+ prompt += (
+ "\n\n\n"
+ + "\n\n".join(feedback_lines)
+ + "\n"
+ )
+
+ if previous_response:
+ prompt += (
+ "\n\n\n"
+ + previous_response
+ + "\n"
+ )
+
+ return prompt
+
+
+def parse_edit_instructions(
+ payload: dict,
+ checked_out_paths: Iterable[Path],
+) -> List[EditInstruction]:
+ action = payload.get("action")
+ if action != "edit":
+ raise EditParseError("Edit tool payload must include action='edit'.")
+
+ edits = payload.get("edits")
+ if not isinstance(edits, list) or not edits:
+ raise EditParseError("Edit tool payload must include a non-empty edits list.")
+
+ checked_out_map = {path.name.lower(): path for path in checked_out_paths}
+ instructions: List[EditInstruction] = []
+
+ for edit in edits:
+ if not isinstance(edit, dict):
+ raise EditParseError("Each edit must be an object.")
+ file_name = edit.get("file")
+ if not isinstance(file_name, str) or not file_name.strip():
+ raise EditParseError("Each edit must include a non-empty file name.")
+ path = checked_out_map.get(file_name.strip().lower())
+ if path is None:
+ raise EditParseError(
+ f"Edit file '{file_name}' is not in the checked-out file list."
+ )
+ find_text = edit.get("find")
+ if not isinstance(find_text, str) or not find_text.strip():
+ raise EditParseError("Each edit must include non-empty find text.")
+ is_duplicate = edit.get("is_duplicate")
+ if not isinstance(is_duplicate, bool):
+ raise EditParseError("Each edit must include a boolean is_duplicate flag.")
+ replace_text = edit.get("replace")
+ if is_duplicate:
+ replace_text = None
+ else:
+ if not isinstance(replace_text, str):
+ raise EditParseError(
+ "Non-duplicate edits must include a string replace value."
+ )
+ instructions.append(
+ EditInstruction(
+ file_path=path,
+ find_text=find_text,
+ replace_text=replace_text,
+ is_duplicate=is_duplicate,
+ )
+ )
+
+ return instructions
+
+
+def _normalize_line_endings(text: str) -> str:
+ return text.replace("\r\n", "\n").replace("\r", "\n")
+
+
+def _build_whitespace_pattern(text: str, allow_zero: bool) -> re.Pattern[str]:
+ if not text:
+ raise ValueError("Cannot build whitespace pattern for empty text.")
+
+ pieces: List[str] = []
+ whitespace_token = r"\s*" if allow_zero else r"\s+"
+ in_whitespace = False
+
+ for char in text:
+ if char.isspace():
+ if not in_whitespace:
+ pieces.append(whitespace_token)
+ in_whitespace = True
+ else:
+ pieces.append(re.escape(char))
+ in_whitespace = False
+
+ pattern = "".join(pieces)
+ if not pattern:
+ pattern = whitespace_token
+ return re.compile(pattern, flags=re.MULTILINE)
+
+
+def _locate_search_text(
+ body: str, search_text: str
+) -> tuple[int | None, int | None, str]:
+ attempted_descriptions: List[str] = []
+
+ index = body.find(search_text)
+ attempted_descriptions.append("exact match")
+ if index != -1:
+ next_index = body.find(search_text, index + len(search_text))
+ if next_index != -1:
+ reason = (
+ "SEARCH text matched multiple locations using exact match; "
+ "increase SEARCH text length to match a longer, more specific span."
+ )
+ return None, None, reason
+ return index, index + len(search_text), ""
+
+ trimmed_newline_search = search_text.strip("\n")
+ if trimmed_newline_search and trimmed_newline_search != search_text:
+ attempted_descriptions.append("trimmed newline boundaries")
+ index = body.find(trimmed_newline_search)
+ if index != -1:
+ next_index = body.find(
+ trimmed_newline_search, index + len(trimmed_newline_search)
+ )
+ if next_index != -1:
+ reason = (
+ "SEARCH text matched multiple locations using trimmed newline "
+ "boundaries; increase SEARCH text length to match a longer, more specific span."
+ )
+ return None, None, reason
+ return index, index + len(trimmed_newline_search), ""
+
+ trimmed_whitespace_search = search_text.strip()
+ if trimmed_whitespace_search and trimmed_whitespace_search not in {
+ search_text,
+ trimmed_newline_search,
+ }:
+ attempted_descriptions.append("trimmed outer whitespace")
+ index = body.find(trimmed_whitespace_search)
+ if index != -1:
+ next_index = body.find(
+ trimmed_whitespace_search, index + len(trimmed_whitespace_search)
+ )
+ if next_index != -1:
+ reason = (
+ "SEARCH text matched multiple locations using trimmed outer "
+ "whitespace; increase SEARCH text length to match a longer, more specific span."
+ )
+ return None, None, reason
+ return index, index + len(trimmed_whitespace_search), ""
+
+ if search_text.strip():
+ pattern_whitespace = _build_whitespace_pattern(search_text, allow_zero=False)
+ attempted_descriptions.append("normalized whitespace gaps")
+ matches = list(pattern_whitespace.finditer(body))
+ if matches:
+ if len(matches) > 1:
+ reason = (
+ "SEARCH text matched multiple locations using normalized whitespace "
+ "gaps; increase SEARCH text length to match a longer, more specific span."
+ )
+ return None, None, reason
+ match = matches[0]
+ return match.start(), match.end(), ""
+
+ pattern_relaxed = _build_whitespace_pattern(search_text, allow_zero=True)
+ attempted_descriptions.append("removed whitespace gaps")
+ matches = list(pattern_relaxed.finditer(body))
+ if matches:
+ if len(matches) > 1:
+ reason = (
+ "SEARCH text matched multiple locations using removed whitespace "
+ "gaps; increase SEARCH text length to match a longer, more specific span."
+ )
+ return None, None, reason
+ match = matches[0]
+ return match.start(), match.end(), ""
+
+ reason = "SEARCH text not found after attempts: " + ", ".join(
+ attempted_descriptions
+ )
+ return None, None, reason
+
+
+def _replace_slice(body: str, start: int, end: int, replacement: str) -> str:
+ return body[:start] + replacement + body[end:]
+
+
+def apply_edits(
+ file_contents: Dict[Path, str],
+ edits: List[EditInstruction],
+) -> tuple[EditApplication | None, List[EditFailure]]:
+ updated_contents = {
+ path: _normalize_line_endings(content)
+ for path, content in file_contents.items()
+ }
+ failures: List[EditFailure] = []
+ patch_replacements: List[str] = []
+ duplicate_texts: List[str] = []
+
+ for index, edit in enumerate(edits, start=1):
+ content = updated_contents[edit.file_path]
+ start, end, reason = _locate_search_text(content, edit.find_text)
+ if start is None or end is None:
+ failures.append(
+ EditFailure(
+ index=index,
+ file_path=edit.file_path,
+ find_text=edit.find_text,
+ reason=reason,
+ )
+ )
+ continue
+ if edit.is_duplicate:
+ duplicate_texts.append(edit.find_text)
+ continue
+ replacement = edit.replace_text or ""
+ updated_contents[edit.file_path] = _replace_slice(
+ content, start, end, replacement
+ )
+ patch_replacements.append(replacement)
+
+ if failures:
+ return None, failures
+
+ return EditApplication(updated_contents, patch_replacements, duplicate_texts), []
+
+
+def request_and_apply_edits(
+ client,
+ chunk_text: str,
+ checked_out_contents: Dict[Path, str],
+ checked_out_paths: Iterable[Path],
+ context_label: str,
+) -> EditApplication:
+ failed_edits: List[EditFailure] | None = None
+ failed_formatting: str | None = None
+ previous_response: str | None = None
+
+ for attempt in range(1, MAX_PATCH_ATTEMPTS + 1):
+ attempt_label = (
+ context_label if attempt == 1 else f"{context_label} attempt {attempt}"
+ )
+ prompt = build_edit_prompt(
+ chunk_text,
+ checked_out_contents,
+ failed_edits=failed_edits,
+ failed_formatting=failed_formatting,
+ previous_response=previous_response,
+ )
+
+ tool_call = request_tool_call(
+ client, prompt, [EDIT_TOOL_SCHEMA], f"edit {attempt_label}"
+ )
+ previous_response = tool_call.arguments
+
+ try:
+ payload = parse_tool_call_arguments(tool_call)
+ edit_instructions = parse_edit_instructions(payload, checked_out_paths)
+ except Exception as error: # noqa: BLE001
+ failed_formatting = str(error)
+ failed_edits = None
+ logger.warning(f"Edit response invalid for {attempt_label}: {error}")
+ continue
+
+ failed_formatting = None
+ application, failures = apply_edits(checked_out_contents, edit_instructions)
+ if not failures:
+ return application
+ failed_edits = failures
+ logger.info(
+ f"Retrying {context_label}; {len(failed_edits)} edit(s) failed to match."
+ )
+
+ raise RuntimeError(
+ f"Unable to apply edits for {context_label} after {MAX_PATCH_ATTEMPTS} attempt(s)."
+ )
diff --git a/src/spec_exploration.py b/src/spec_exploration.py
new file mode 100644
index 0000000..c8c1c2f
--- /dev/null
+++ b/src/spec_exploration.py
@@ -0,0 +1,308 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Dict, Iterable, List, Tuple
+
+from spec_config import MAX_TOOL_ATTEMPTS, SpecConfig
+from spec_llm import parse_tool_call_arguments, request_tool_call
+from spec_notes import NoteRepository, ViewedNote
+
+
+VIEW_TOOL_SCHEMA = {
+ "type": "function",
+ "name": "view_files",
+ "description": "Request additional files to view.",
+ "strict": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["view"]},
+ "files": {"type": "array", "items": {"type": "string"}},
+ },
+ "required": ["action", "files"],
+ },
+}
+
+CHECKOUT_TOOL_SCHEMA = {
+ "type": "function",
+ "name": "checkout_files",
+ "description": "Select viewed files to check out for editing.",
+ "strict": True,
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["checkout"]},
+ "files": {"type": "array", "items": {"type": "string"}},
+ },
+ "required": ["action", "files"],
+ },
+}
+
+
+@dataclass(frozen=True)
+class ExplorationState:
+ viewed: Dict[Path, ViewedNote]
+ available: Dict[Path, str]
+
+
+class ExplorationError(RuntimeError):
+ pass
+
+
+def format_viewed_note(note: ViewedNote) -> str:
+ headings = "\n".join(f"- {heading}" for heading in note.headings)
+ links = "\n".join(
+ f"- [[{path.name}]] — {summary}" for path, summary in note.link_summaries
+ )
+ return (
+ f"## [{note.path.name}]\n\n"
+ f"**Summary:** {note.summary}\n\n"
+ f"**Headings:**\n{headings}\n\n"
+ f"**Links to:**\n{links}"
+ )
+
+
+def format_available_note(path: Path, summary: str) -> str:
+ return f"- [[{path.name}]] — {summary}"
+
+
+def build_exploration_prompt(
+ chunk_text: str,
+ viewed_notes: Iterable[ViewedNote],
+ available_notes: Iterable[Tuple[Path, str]],
+ remaining_rounds: int,
+ config: SpecConfig,
+ feedback: str | None = None,
+) -> str:
+ viewed_blocks = [format_viewed_note(note) for note in viewed_notes]
+ available_blocks = [format_available_note(path, summary) for path, summary in available_notes]
+
+ instructions = (
+ "You are exploring notes to decide which files to view next or to checkout. "
+ "Respond with a tool call to view_files selecting up to "
+ f"{config.max_files_viewed_per_round} AVAILABLE files, or call checkout_files "
+ "to select up to {max_checkout} VIEWED files for editing. "
+ "Only choose files from the provided lists."
+ ).format(max_checkout=config.max_files_checked_out)
+
+ prompt = (
+ "\n"
+ f"{instructions}\n"
+ "\n\n"
+ "\n"
+ f"{chunk_text}\n"
+ "\n\n"
+ "\n"
+ f"{'\n\n'.join(viewed_blocks) if viewed_blocks else ''}\n"
+ "\n\n"
+ "\n"
+ f"{'\n'.join(available_blocks) if available_blocks else ''}\n"
+ "\n\n"
+ f"{remaining_rounds}"
+ )
+ if feedback:
+ prompt += f"\n\n\n{feedback}\n"
+ return prompt
+
+
+def _normalize_file_name(value: str) -> str:
+ trimmed = value.strip()
+ if trimmed.startswith("[[") and trimmed.endswith("]]"):
+ trimmed = trimmed[2:-2].strip()
+ if "|" in trimmed:
+ trimmed = trimmed.split("|", 1)[0].strip()
+ if "#" in trimmed:
+ trimmed = trimmed.split("#", 1)[0].strip()
+ if not trimmed:
+ raise ExplorationError("File reference cannot be empty.")
+ if not trimmed.lower().endswith(".md"):
+ trimmed = f"{trimmed}.md"
+ return trimmed
+
+
+def _dedupe_preserve_order(values: Iterable[str]) -> List[str]:
+ seen: set[str] = set()
+ result: List[str] = []
+ for value in values:
+ key = value.lower()
+ if key in seen:
+ continue
+ seen.add(key)
+ result.append(value)
+ return result
+
+
+def _parse_file_list(payload: dict, action: str) -> List[str]:
+ if payload.get("action") != action:
+ raise ExplorationError(f"Tool payload must include action='{action}'.")
+ files = payload.get("files")
+ if not isinstance(files, list):
+ raise ExplorationError("Tool payload must include a files list.")
+ file_names: List[str] = []
+ for value in files:
+ if not isinstance(value, str) or not value.strip():
+ raise ExplorationError("Each file entry must be a non-empty string.")
+ file_names.append(_normalize_file_name(value))
+ return _dedupe_preserve_order(file_names)
+
+
+def _resolve_requested_paths(
+ names: Iterable[str],
+ mapping: Dict[str, Path],
+ label: str,
+) -> List[Path]:
+ resolved: List[Path] = []
+ for name in names:
+ key = name.lower()
+ path = mapping.get(key)
+ if path is None:
+ raise ExplorationError(f"Requested {label} file '{name}' is not available.")
+ resolved.append(path)
+ return resolved
+
+
+def explore_until_checkout(
+ client,
+ chunk_text: str,
+ root_path: Path,
+ root_summary: str,
+ root_headings: List[str],
+ root_links: List[Path],
+ root_link_summaries: List[tuple[Path, str]],
+ summary_map: Dict[Path, str],
+ repo: NoteRepository,
+ config: SpecConfig,
+) -> List[Path]:
+ viewed: Dict[Path, ViewedNote] = {}
+ available: Dict[Path, str] = {}
+
+ viewed[root_path] = ViewedNote(
+ path=root_path,
+ summary=root_summary,
+ headings=root_headings,
+ links=root_links,
+ link_summaries=root_link_summaries,
+ )
+ for path in root_links:
+ if path not in viewed and path in summary_map:
+ available[path] = summary_map[path]
+
+ rounds_left = config.max_exploration_rounds
+ total_viewed_limit = config.max_files_viewed_total
+
+ while rounds_left > 0:
+ needs_checkout = len(viewed) >= total_viewed_limit or not available
+ feedback = None
+ attempts_left = MAX_TOOL_ATTEMPTS
+
+ while attempts_left > 0:
+ prompt = build_exploration_prompt(
+ chunk_text,
+ viewed.values(),
+ available.items(),
+ rounds_left,
+ config,
+ feedback=feedback,
+ )
+ tools = [CHECKOUT_TOOL_SCHEMA] if needs_checkout else [VIEW_TOOL_SCHEMA, CHECKOUT_TOOL_SCHEMA]
+ tool_call = request_tool_call(
+ client,
+ prompt,
+ tools,
+ f"exploration round {config.max_exploration_rounds - rounds_left + 1}",
+ )
+ payload = parse_tool_call_arguments(tool_call)
+ try:
+ if tool_call.name == "checkout_files":
+ requested = _parse_file_list(payload, "checkout")
+ if len(requested) > config.max_files_checked_out:
+ raise ExplorationError(
+ "Checkout request exceeds max files allowed."
+ )
+ view_map = {path.name.lower(): path for path in viewed.keys()}
+ checkout_paths = _resolve_requested_paths(
+ requested, view_map, "viewed"
+ )
+ if not checkout_paths:
+ raise ExplorationError("Checkout request must include at least one file.")
+ return checkout_paths
+
+ if needs_checkout:
+ raise ExplorationError(
+ "No additional files are available to view; you must checkout."
+ )
+ requested = _parse_file_list(payload, "view")
+ if len(requested) > config.max_files_viewed_per_round:
+ raise ExplorationError("View request exceeds max files allowed.")
+ available_map = {path.name.lower(): path for path in available.keys()}
+ requested_paths = _resolve_requested_paths(
+ requested, available_map, "available"
+ )
+ except ExplorationError as error:
+ feedback = str(error)
+ attempts_left -= 1
+ if attempts_left == 0:
+ raise
+ continue
+
+ for path in requested_paths:
+ summary = available.pop(path)
+ headings = repo.get_headings(path)
+ links = repo.get_links(path)
+ link_summaries = [
+ (link_path, summary_map[link_path])
+ for link_path in links
+ if link_path in summary_map
+ ]
+ viewed[path] = ViewedNote(
+ path=path,
+ summary=summary,
+ headings=headings,
+ links=links,
+ link_summaries=link_summaries,
+ )
+ for link_path in links:
+ if link_path not in viewed and link_path in summary_map:
+ available[link_path] = summary_map[link_path]
+
+ if len(viewed) >= total_viewed_limit:
+ break
+
+ rounds_left -= 1
+ break
+
+ feedback = None
+ attempts_left = MAX_TOOL_ATTEMPTS
+ while attempts_left > 0:
+ prompt = build_exploration_prompt(
+ chunk_text,
+ viewed.values(),
+ available.items(),
+ rounds_left,
+ config,
+ feedback=feedback,
+ )
+ tool_call = request_tool_call(
+ client,
+ prompt,
+ [CHECKOUT_TOOL_SCHEMA],
+ "exploration checkout",
+ )
+ payload = parse_tool_call_arguments(tool_call)
+ try:
+ requested = _parse_file_list(payload, "checkout")
+ if len(requested) > config.max_files_checked_out:
+ raise ExplorationError("Checkout request exceeds max files allowed.")
+ view_map = {path.name.lower(): path for path in viewed.keys()}
+ checkout_paths = _resolve_requested_paths(requested, view_map, "viewed")
+ if not checkout_paths:
+ raise ExplorationError("Checkout request must include at least one file.")
+ return checkout_paths
+ except ExplorationError as error:
+ feedback = str(error)
+ attempts_left -= 1
+ if attempts_left == 0:
+ raise
+
+ raise ExplorationError("Unable to select checkout files.")
diff --git a/src/spec_llm.py b/src/spec_llm.py
new file mode 100644
index 0000000..40cb049
--- /dev/null
+++ b/src/spec_llm.py
@@ -0,0 +1,111 @@
+from __future__ import annotations
+
+import json
+import os
+from time import sleep
+from typing import Iterable
+
+from loguru import logger
+from openai import OpenAI
+from openai.types.responses import ResponseFunctionToolCall
+
+from spec_config import (
+ DEFAULT_MAX_RETRIES,
+ DEFAULT_MODEL,
+ DEFAULT_REASONING,
+ ENV_API_KEY,
+ RETRY_BACKOFF_FACTOR,
+ RETRY_INITIAL_DELAY_SECONDS,
+)
+
+
+def create_openai_client() -> OpenAI:
+ api_key = os.getenv(ENV_API_KEY)
+ if not api_key:
+ raise RuntimeError(
+ f"Environment variable {ENV_API_KEY} is required for GPT access."
+ )
+ return OpenAI(api_key=api_key)
+
+
+def execute_with_retry(
+ operation,
+ description: str,
+ max_attempts: int = DEFAULT_MAX_RETRIES,
+ initial_delay_seconds: float = RETRY_INITIAL_DELAY_SECONDS,
+ backoff_factor: float = RETRY_BACKOFF_FACTOR,
+):
+ attempt = 1
+ delay = initial_delay_seconds
+ while True:
+ try:
+ return operation()
+ except Exception as error:
+ if attempt >= max_attempts:
+ logger.exception(
+ f"OpenAI {description} failed after {max_attempts} attempt(s): {error}"
+ )
+ raise
+ logger.warning(
+ f"OpenAI {description} attempt {attempt} failed: {error}. Retrying in {delay:.1f}s."
+ )
+ sleep(delay)
+ attempt += 1
+ delay *= backoff_factor
+
+
+def request_text(client: OpenAI, prompt: str, context_label: str) -> str:
+ def perform_request() -> str:
+ response = client.responses.create(
+ model=DEFAULT_MODEL,
+ reasoning=DEFAULT_REASONING,
+ input=prompt,
+ )
+ if response.error:
+ raise RuntimeError(f"OpenAI error for {context_label}: {response.error}")
+ output_text = response.output_text
+ if not output_text.strip():
+ raise RuntimeError(f"Received empty response for {context_label}.")
+ return output_text.strip()
+
+ return execute_with_retry(perform_request, context_label)
+
+
+def request_tool_call(
+ client: OpenAI, prompt: str, tools: Iterable[dict], context_label: str
+) -> ResponseFunctionToolCall:
+ def perform_request() -> ResponseFunctionToolCall:
+ response = client.responses.create(
+ model=DEFAULT_MODEL,
+ reasoning=DEFAULT_REASONING,
+ input=prompt,
+ tools=list(tools),
+ tool_choice="required",
+ parallel_tool_calls=False,
+ )
+ if response.error:
+ raise RuntimeError(f"OpenAI error for {context_label}: {response.error}")
+ tool_calls = [item for item in response.output if item.type == "function_call"]
+ if not tool_calls:
+ raise RuntimeError(f"No tool call returned for {context_label}.")
+ if len(tool_calls) > 1:
+ raise RuntimeError(
+ f"Expected a single tool call for {context_label}, got {len(tool_calls)}."
+ )
+ return tool_calls[0]
+
+ return execute_with_retry(perform_request, context_label)
+
+
+def parse_tool_call_arguments(call: ResponseFunctionToolCall) -> dict:
+ if not call.arguments:
+ raise RuntimeError(f"Tool call {call.name} missing arguments.")
+ try:
+ payload = json.loads(call.arguments)
+ except json.JSONDecodeError as error:
+ raise RuntimeError(
+ f"Tool call {call.name} arguments are not valid JSON: {error}"
+ ) from error
+ if not isinstance(payload, dict):
+ raise RuntimeError(f"Tool call {call.name} arguments must be a JSON object.")
+ return payload
diff --git a/src/spec_logging.py b/src/spec_logging.py
new file mode 100644
index 0000000..aeba1bb
--- /dev/null
+++ b/src/spec_logging.py
@@ -0,0 +1,24 @@
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+from loguru import logger
+
+from spec_config import LOG_FILE_ROTATION_BYTES
+
+
+def configure_logging(log_path: Path) -> None:
+ logger.remove()
+ logger.add(sys.stderr, level="INFO", enqueue=False)
+ try:
+ log_path.parent.mkdir(parents=True, exist_ok=True)
+ except OSError as error:
+ raise RuntimeError(f"Failed to prepare log directory {log_path.parent}: {error}") from error
+ logger.add(
+ log_path,
+ level="DEBUG",
+ rotation=LOG_FILE_ROTATION_BYTES,
+ enqueue=False,
+ encoding="utf-8",
+ )
diff --git a/src/spec_markdown.py b/src/spec_markdown.py
new file mode 100644
index 0000000..5584488
--- /dev/null
+++ b/src/spec_markdown.py
@@ -0,0 +1,74 @@
+from __future__ import annotations
+
+import re
+from typing import List, Tuple
+
+
+WIKILINK_PATTERN = re.compile(r"\[\[([^\]|#]+)(?:#[^\]|]+)?(?:\|[^\]]+)?\]\]")
+HEADING_PATTERN = re.compile(r"^(#{1,6})\s+(.+?)\s*$")
+
+
+def split_document_sections(content: str, scratchpad_heading: str) -> Tuple[str, str]:
+ if scratchpad_heading not in content:
+ raise ValueError(f"Document must contain the heading '{scratchpad_heading}'.")
+ heading_index = content.index(scratchpad_heading)
+ body = content[:heading_index].rstrip()
+ scratchpad = content[heading_index + len(scratchpad_heading) :].lstrip("\n")
+ return body, scratchpad
+
+
+def normalize_paragraphs(text: str) -> List[str]:
+ stripped_text = text.strip()
+ if not stripped_text:
+ return []
+ return [
+ block.strip() for block in re.split(r"\n\s*\n", stripped_text) if block.strip()
+ ]
+
+
+def count_words(text: str) -> int:
+ return len(text.split())
+
+
+def extract_headings(content: str) -> List[str]:
+ headings: List[str] = []
+ for line in content.splitlines():
+ match = HEADING_PATTERN.match(line)
+ if match:
+ hashes, title = match.groups()
+ headings.append(f"{hashes} {title.strip()}")
+ return headings
+
+
+def extract_wikilinks(content: str) -> List[str]:
+ targets: List[str] = []
+ for match in WIKILINK_PATTERN.finditer(content):
+ target = match.group(1).strip()
+ if target:
+ targets.append(target)
+ return targets
+
+
+def build_document(body: str, scratchpad_heading: str, scratchpad_paragraphs: List[str]) -> str:
+ trimmed_body = body.rstrip()
+ parts = [trimmed_body, scratchpad_heading]
+ if scratchpad_paragraphs:
+ scratchpad_text = "\n\n".join(scratchpad_paragraphs).rstrip()
+ parts.append(scratchpad_text)
+ document = "\n\n".join(part for part in parts if part)
+ if not document.endswith("\n"):
+ document += "\n"
+ return document
+
+
+def format_duration(seconds: float) -> str:
+ remaining_seconds = max(0, int(round(seconds)))
+ hours, remainder = divmod(remaining_seconds, 3600)
+ minutes, seconds = divmod(remainder, 60)
+ parts: List[str] = []
+ if hours:
+ parts.append(f"{hours}h")
+ if hours or minutes:
+ parts.append(f"{minutes}m")
+ parts.append(f"{seconds}s")
+ return " ".join(parts)
diff --git a/src/spec_notes.py b/src/spec_notes.py
new file mode 100644
index 0000000..6ceb34d
--- /dev/null
+++ b/src/spec_notes.py
@@ -0,0 +1,87 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Dict, Iterable, List, Optional, Set
+
+from spec_markdown import count_words, extract_headings, extract_wikilinks
+
+
+@dataclass(frozen=True)
+class ViewedNote:
+ path: Path
+ summary: str
+ headings: List[str]
+ links: List[Path]
+ link_summaries: List[tuple[Path, str]]
+
+
+class NoteRepository:
+ def __init__(self, root_path: Path, root_body: str, notes_dir: Path) -> None:
+ self._root_path = root_path
+ self._root_body = root_body
+ self._notes_dir = notes_dir
+ self._content_cache: Dict[Path, str] = {}
+ self._file_index = self._build_file_index()
+
+ def _build_file_index(self) -> Dict[str, Path]:
+ mapping: Dict[str, Path] = {}
+ for path in self._notes_dir.iterdir():
+ if path.is_file() and path.suffix.lower() == ".md":
+ mapping[path.name.lower()] = path
+ return mapping
+
+ def resolve_link(self, link_text: str) -> Optional[Path]:
+ target = link_text.strip()
+ if not target:
+ return None
+ if not target.lower().endswith(".md"):
+ target = f"{target}.md"
+ return self._file_index.get(target.lower())
+
+ def get_note_content(self, path: Path) -> str:
+ if path == self._root_path:
+ return self._root_body
+ cached = self._content_cache.get(path)
+ if cached is not None:
+ return cached
+ content = path.read_text(encoding="utf-8")
+ self._content_cache[path] = content
+ return content
+
+ def get_headings(self, path: Path) -> List[str]:
+ return extract_headings(self.get_note_content(path))
+
+ def get_links(self, path: Path) -> List[Path]:
+ links: List[Path] = []
+ for target in extract_wikilinks(self.get_note_content(path)):
+ resolved = self.resolve_link(target)
+ if resolved is not None:
+ links.append(resolved)
+ return links
+
+ def get_word_count(self, path: Path) -> int:
+ return count_words(self.get_note_content(path))
+
+ def is_index_note(self, path: Path, index_suffix: str) -> bool:
+ return path.name.lower().endswith(index_suffix.lower())
+
+ def iter_reachable_paths(self) -> List[Path]:
+ visited: Set[Path] = set()
+ stack: List[Path] = [self._root_path]
+ while stack:
+ path = stack.pop()
+ if path in visited:
+ continue
+ visited.add(path)
+ for link in self.get_links(path):
+ if link not in visited:
+ stack.append(link)
+ return list(visited)
+
+ def set_root_body(self, body: str) -> None:
+ self._root_body = body
+ self._content_cache.pop(self._root_path, None)
+
+ def invalidate_content(self, path: Path) -> None:
+ self._content_cache.pop(path, None)
diff --git a/src/spec_summary.py b/src/spec_summary.py
new file mode 100644
index 0000000..eca8597
--- /dev/null
+++ b/src/spec_summary.py
@@ -0,0 +1,237 @@
+from __future__ import annotations
+
+import hashlib
+import json
+import os
+from concurrent.futures import Future, ThreadPoolExecutor
+from dataclasses import dataclass
+from pathlib import Path
+from threading import Lock
+from typing import Dict, Iterable, List
+
+from spec_config import SpecConfig, default_summary_cache_path
+from spec_llm import request_text
+from spec_markdown import extract_wikilinks
+from spec_notes import NoteRepository
+
+
+@dataclass(frozen=True)
+class SummaryRecord:
+ content_hash: str
+ summary: str
+
+
+class SummaryCache:
+ def __init__(self, cache_path: Path) -> None:
+ self._path = cache_path
+ self._lock = Lock()
+ self._data: Dict[str, SummaryRecord] = {}
+ self._load()
+
+ def _load(self) -> None:
+ if not self._path.exists():
+ return
+ raw = self._path.read_text(encoding="utf-8")
+ if not raw.strip():
+ return
+ data = json.loads(raw)
+ if not isinstance(data, dict):
+ raise RuntimeError("Summary cache must contain a JSON object.")
+ for key, value in data.items():
+ if not isinstance(value, dict):
+ continue
+ content_hash = value.get("content_hash")
+ summary = value.get("summary")
+ if isinstance(content_hash, str) and isinstance(summary, str):
+ self._data[key] = SummaryRecord(content_hash, summary)
+
+ def get(self, path: Path, content_hash: str) -> str | None:
+ record = self._data.get(str(path))
+ if record and record.content_hash == content_hash:
+ return record.summary
+ return None
+
+ def set(self, path: Path, content_hash: str, summary: str) -> None:
+ with self._lock:
+ self._data[str(path)] = SummaryRecord(content_hash, summary)
+ self._path.parent.mkdir(parents=True, exist_ok=True)
+ payload = {
+ key: {"content_hash": record.content_hash, "summary": record.summary}
+ for key, record in self._data.items()
+ }
+ self._path.write_text(
+ json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8"
+ )
+
+ def invalidate(self, path: Path) -> None:
+ with self._lock:
+ if str(path) in self._data:
+ self._data.pop(str(path))
+ self._path.parent.mkdir(parents=True, exist_ok=True)
+ payload = {
+ key: {"content_hash": record.content_hash, "summary": record.summary}
+ for key, record in self._data.items()
+ }
+ self._path.write_text(
+ json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8"
+ )
+
+
+def _hash_content(content: str) -> str:
+ return hashlib.sha256(content.encode("utf-8")).hexdigest()
+
+
+def _default_summary_workers() -> int:
+ cpu_count = os.cpu_count() or 4
+ return max(4, min(32, cpu_count * 4))
+
+
+class SummaryService:
+ def __init__(
+ self,
+ repo: NoteRepository,
+ client,
+ config: SpecConfig,
+ cache_path: Path | None = None,
+ ) -> None:
+ self._repo = repo
+ self._client = client
+ self._config = config
+ self._cache = SummaryCache(cache_path or default_summary_cache_path())
+ self._executor = ThreadPoolExecutor(max_workers=_default_summary_workers())
+ self._lock = Lock()
+ self._inflight: Dict[Path, Future[str]] = {}
+
+ def shutdown(self) -> None:
+ self._executor.shutdown(wait=True)
+
+ def invalidate(self, path: Path) -> None:
+ with self._lock:
+ self._inflight.pop(path, None)
+ self._cache.invalidate(path)
+
+ def get_summaries(self, paths: Iterable[Path]) -> Dict[Path, str]:
+ unique_paths = list(dict.fromkeys(paths))
+ standard_paths: List[Path] = []
+ index_paths: List[Path] = []
+ for path in unique_paths:
+ if self._repo.is_index_note(path, self._config.index_filename_suffix):
+ index_paths.append(path)
+ else:
+ standard_paths.append(path)
+
+ futures = {path: self._ensure_future(path) for path in standard_paths}
+ summaries: Dict[Path, str] = {
+ path: future.result() for path, future in futures.items()
+ }
+
+ for path in index_paths:
+ summaries[path] = self._compute_index_summary(path, stack=[])
+
+ return summaries
+
+ def get_summary(self, path: Path) -> str:
+ if self._repo.is_index_note(path, self._config.index_filename_suffix):
+ return self._compute_index_summary(path, stack=[])
+ return self._ensure_future(path).result()
+
+ def _ensure_future(self, path: Path) -> Future[str]:
+ if self._repo.is_index_note(path, self._config.index_filename_suffix):
+ raise RuntimeError(
+ f"Index note summaries must be computed synchronously: {path.name}."
+ )
+ with self._lock:
+ existing = self._inflight.get(path)
+ if existing is not None:
+ return existing
+ future: Future[str] = self._executor.submit(
+ self._compute_standard_summary, path
+ )
+ self._inflight[path] = future
+ return future
+
+ def _compute_standard_summary(self, path: Path) -> str:
+ try:
+ return self._compute_standard_summary_inner(path)
+ finally:
+ with self._lock:
+ self._inflight.pop(path, None)
+
+ def _compute_standard_summary_inner(self, path: Path) -> str:
+ content = self._repo.get_note_content(path)
+ content_hash = _hash_content(content)
+ cached = self._cache.get(path, content_hash)
+ if cached is not None:
+ return cached
+ summary = self._summarize_standard_note(path, content)
+ self._cache.set(path, content_hash, summary)
+ return summary
+
+ def _summarize_standard_note(self, path: Path, content: str) -> str:
+ prompt = (
+ "Generate a {min_words}-{max_words} word summary of this note's content.\n"
+ "Focus on: main topics, key claims, what questions it answers.\n\n"
+ "\n{content}\n"
+ ).format(
+ min_words=self._config.summary_target_words_min,
+ max_words=self._config.summary_target_words_max,
+ content=content,
+ )
+ return request_text(self._client, prompt, f"summary {path.name}")
+
+ def _compute_index_summary(self, path: Path, stack: List[Path]) -> str:
+ if path in stack:
+ cycle = " -> ".join(item.name for item in stack + [path])
+ raise RuntimeError(f"Cycle detected while summarizing index notes: {cycle}")
+
+ content = self._repo.get_note_content(path)
+ content_hash = _hash_content(content)
+ cached = self._cache.get(path, content_hash)
+ if cached is not None:
+ return cached
+
+ stack.append(path)
+ try:
+ summary = self._summarize_index_note(path, content, stack)
+ finally:
+ stack.pop()
+
+ self._cache.set(path, content_hash, summary)
+ return summary
+
+ def _summarize_index_note(self, path: Path, content: str, stack: List[Path]) -> str:
+ linked_paths: List[Path] = []
+ seen: set[Path] = set()
+ for target in extract_wikilinks(content):
+ resolved = self._repo.resolve_link(target)
+ if resolved is not None and resolved not in seen:
+ seen.add(resolved)
+ linked_paths.append(resolved)
+
+ standard_paths: List[Path] = []
+ index_paths: List[Path] = []
+ for linked_path in linked_paths:
+ if self._repo.is_index_note(linked_path, self._config.index_filename_suffix):
+ index_paths.append(linked_path)
+ else:
+ standard_paths.append(linked_path)
+
+ futures = {linked_path: self._ensure_future(linked_path) for linked_path in standard_paths}
+
+ summaries: List[str] = []
+ for linked_path in index_paths:
+ summaries.append(self._compute_index_summary(linked_path, stack))
+ for linked_path, future in futures.items():
+ summaries.append(future.result())
+
+ joined_summaries = "\n\n".join(summaries)
+ prompt = (
+ "Generate a summary based on these summaries of linked notes:\n"
+ "{summaries}\n\n"
+ "Synthesize into {min_words}-{max_words} words describing what this index covers."
+ ).format(
+ summaries=joined_summaries,
+ min_words=self._config.summary_target_words_min,
+ max_words=self._config.summary_target_words_max,
+ )
+ return request_text(self._client, prompt, f"summary {path.name}")
diff --git a/src/spec_verification.py b/src/spec_verification.py
new file mode 100644
index 0000000..5dc267e
--- /dev/null
+++ b/src/spec_verification.py
@@ -0,0 +1,327 @@
+from __future__ import annotations
+
+import json
+import shutil
+import subprocess
+from concurrent.futures import ThreadPoolExecutor
+from dataclasses import dataclass
+from pathlib import Path
+from threading import Event, Lock, Thread
+from typing import Any, List, Sequence
+from uuid import uuid4
+
+from loguru import logger
+
+from spec_config import MAX_CONCURRENT_VERIFICATIONS, default_pending_prompts_path
+from spec_llm import request_text
+
+
+NOTIFY_SEND_PATH = shutil.which("notify-send")
+_NOTIFY_SEND_UNAVAILABLE_WARNING_EMITTED = False
+
+
+def notify_missing_verification(
+ chunk_index: int, total_chunks: int, assessment: str
+) -> None:
+ global _NOTIFY_SEND_UNAVAILABLE_WARNING_EMITTED
+ title = "Integration verification missing content"
+ body = f"Chunk {chunk_index + 1}/{total_chunks}: {assessment}"
+ if NOTIFY_SEND_PATH:
+ try:
+ subprocess.run(
+ [
+ NOTIFY_SEND_PATH,
+ "--app-name=IntegrateNotes",
+ title,
+ body,
+ ],
+ check=True,
+ )
+ except Exception as error:
+ logger.warning(
+ f"notify-send failed for verification chunk {chunk_index + 1}: {error}"
+ )
+ else:
+ if not _NOTIFY_SEND_UNAVAILABLE_WARNING_EMITTED:
+ logger.warning(
+ "notify-send not available; desktop alerts for verification issues disabled."
+ )
+ _NOTIFY_SEND_UNAVAILABLE_WARNING_EMITTED = True
+
+
+@dataclass(frozen=True)
+class DuplicateEvidence:
+ body_text: str
+
+
+class VerificationManager:
+ def __init__(self, client, target_file: Path) -> None:
+ self.client = client
+ self.pending_path = default_pending_prompts_path()
+ self.lock = Lock()
+ self.active_lock = Lock()
+ self.active_ids: set[str] = set()
+ self.executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_VERIFICATIONS)
+ self.new_prompt_event = Event()
+ self.stop_requested = False
+ self.tracked_file_name = Path(target_file).resolve().name
+ self.worker = Thread(
+ target=self._run,
+ name="VerificationManager",
+ daemon=True,
+ )
+ self.worker.start()
+
+ def enqueue_prompt(
+ self,
+ prompt: str,
+ context_label: str | None,
+ chunk_index: int | None,
+ total_chunks: int | None,
+ ) -> None:
+ if not isinstance(prompt, str) or not prompt.strip():
+ raise ValueError("Verification prompt must be a non-empty string.")
+
+ entry = {
+ "id": str(uuid4()),
+ "prompt": prompt,
+ "context_label": context_label,
+ "chunk_index": chunk_index,
+ "total_chunks": total_chunks,
+ "file_name": self.tracked_file_name,
+ }
+ with self.lock:
+ entries = self._read_entries_locked()
+ entries.append(entry)
+ self._write_entries_locked(entries)
+ self.new_prompt_event.set()
+
+ def shutdown(self) -> None:
+ self.stop_requested = True
+ self.new_prompt_event.set()
+ if self.worker.is_alive():
+ self.worker.join()
+ self.executor.shutdown(wait=True)
+
+ def _run(self) -> None:
+ while True:
+ try:
+ self._dispatch_pending()
+ except Exception as error:
+ logger.exception(
+ f"Verification dispatcher encountered an error: {error}"
+ )
+ if self.stop_requested and not self._has_pending_work():
+ break
+ self.new_prompt_event.wait(timeout=0.5)
+ self.new_prompt_event.clear()
+
+ def _dispatch_pending(self) -> None:
+ with self.lock:
+ all_entries = self._read_entries_locked()
+ entries = self._entries_for_current_file_locked(all_entries)
+
+ for entry in entries:
+ entry_id = entry.get("id")
+ if not entry_id:
+ continue
+ with self.active_lock:
+ if entry_id in self.active_ids:
+ continue
+ self.active_ids.add(entry_id)
+
+ future = self.executor.submit(self._send_prompt, entry)
+ future.add_done_callback(
+ lambda fut, data=entry: self._handle_result(data, fut)
+ )
+
+ def _send_prompt(self, entry: dict[str, Any]) -> str:
+ context_label = entry.get("context_label") or "verification"
+ prompt = entry["prompt"]
+ return request_text(self.client, prompt, f"verification {context_label}")
+
+ def _handle_result(self, entry: dict[str, Any], future) -> None:
+ entry_id = entry.get("id")
+ try:
+ assessment = future.result()
+ except Exception as error: # noqa: BLE001
+ context_label = entry.get("context_label") or "verification"
+ logger.exception(f"Verification for {context_label} failed: {error}")
+ if entry_id:
+ with self.active_lock:
+ self.active_ids.discard(entry_id)
+ self.new_prompt_event.set()
+ return
+
+ self._log_assessment(entry, assessment)
+
+ if entry_id:
+ self._remove_entry(entry_id)
+ with self.active_lock:
+ self.active_ids.discard(entry_id)
+
+ self.new_prompt_event.set()
+
+ def _log_assessment(self, entry: dict[str, Any], assessment: str) -> None:
+ chunk_index = entry.get("chunk_index")
+ total_chunks = entry.get("total_chunks")
+ context_label = entry.get("context_label") or "verification"
+ file_name = entry.get("file_name")
+
+ if not file_name:
+ raise RuntimeError(
+ "Verification entry missing required file_name; pending prompts file may be corrupted."
+ )
+
+ base_header = f'Verification "{file_name}"'
+
+ if (
+ isinstance(chunk_index, int)
+ and isinstance(total_chunks, int)
+ and 0 <= chunk_index < total_chunks
+ ):
+ if "MISSING" in assessment:
+ notify_missing_verification(chunk_index, total_chunks, assessment)
+ chunk_header = f"{base_header}:"
+ if assessment.startswith(chunk_header):
+ logger.info(assessment)
+ else:
+ logger.info(f"{chunk_header}\n{assessment}")
+ else:
+ if context_label != "verification":
+ header = f"{base_header} ({context_label}):"
+ else:
+ header = f"{base_header}:"
+ if assessment.startswith(header):
+ logger.info(assessment)
+ else:
+ logger.info(f"{header}\n{assessment}")
+
+ def _remove_entry(self, entry_id: str) -> None:
+ with self.lock:
+ entries = self._read_entries_locked()
+ remaining = [item for item in entries if item.get("id") != entry_id]
+ self._write_entries_locked(remaining)
+
+ def _read_entries_locked(self) -> List[dict[str, Any]]:
+ if not self.pending_path.exists():
+ return []
+ raw = self.pending_path.read_text(encoding="utf-8")
+ if not raw.strip():
+ return []
+ try:
+ data = json.loads(raw)
+ except json.JSONDecodeError as error:
+ raise RuntimeError(
+ f"Pending verification prompts file {self.pending_path} is corrupted: {error}"
+ ) from error
+ if not isinstance(data, list):
+ raise RuntimeError(
+ f"Pending verification prompts file {self.pending_path} must contain a list."
+ )
+ return data
+
+ def _write_entries_locked(self, entries: List[dict[str, Any]]) -> None:
+ self.pending_path.parent.mkdir(parents=True, exist_ok=True)
+ payload = json.dumps(entries, ensure_ascii=True, indent=2)
+ self.pending_path.write_text(payload, encoding="utf-8")
+
+ def _has_pending_work(self) -> bool:
+ with self.lock:
+ entries = self._read_entries_locked()
+ has_entries = bool(self._entries_for_current_file_locked(entries))
+ with self.active_lock:
+ has_active = bool(self.active_ids)
+ return has_entries or has_active
+
+ def _entries_for_current_file_locked(
+ self, entries: List[dict[str, Any]]
+ ) -> List[dict[str, Any]]:
+ invalid_entries: List[dict[str, Any]] = []
+ relevant_entries: List[dict[str, Any]] = []
+
+ for entry in entries:
+ file_name = entry.get("file_name")
+ entry_id = entry.get("id")
+ if not file_name or not entry_id:
+ invalid_entries.append(entry)
+ continue
+ if file_name == self.tracked_file_name:
+ relevant_entries.append(entry)
+
+ if invalid_entries:
+ invalid_count = len(invalid_entries)
+ suffix = "y" if invalid_count == 1 else "ies"
+ logger.warning(
+ f"Removed {invalid_count} invalid verification prompt entr{suffix} missing file metadata or IDs."
+ )
+ cleaned_entries = [
+ entry for entry in entries if entry not in invalid_entries
+ ]
+ self._write_entries_locked(cleaned_entries)
+
+ return relevant_entries
+
+
+def build_verification_prompt(
+ chunk_text: str,
+ patch_replacements: Sequence[str],
+ duplicate_texts: Sequence[str],
+) -> str:
+ response_instructions = (
+ "Report whether any note content is missing or materially altered."
+ " Respond with a concise single paragraph beginning with 'OK -' if everything is covered"
+ " or 'MISSING -' followed by details of any omissions."
+ " Separate each omission by two newlines and for each omission, provide the following:\n"
+ ' Notes:"..."\n'
+ ' Body:"..."\n'
+ ' Explanation: "..."\n'
+ ' Proposed Fix: "..."\n'
+ "Quote the exact text from the notes chunk containing the missing detail and quote the exact passage from the patch replacements or duplicate evidence that should cover it (or state Body:\"\" if nothing is relevant)."
+ " Explain precisely what information is still missing or altered without omitting any nuance."
+ )
+
+ if patch_replacements:
+ replacement_sections = []
+ for index, replacement_text in enumerate(patch_replacements, start=1):
+ replacement_sections.append(
+ f"[Patch {index} Replacement]\n{replacement_text}"
+ )
+ replacements_block = "\n\n".join(replacement_sections)
+ else:
+ replacements_block = ""
+
+ if duplicate_texts:
+ duplication_sections = []
+ for index, body_text in enumerate(duplicate_texts, start=1):
+ duplication_sections.append(
+ f"[Duplicate {index} Evidence]\nBody:\n{body_text}"
+ )
+ duplications_block = "\n\n".join(duplication_sections)
+ else:
+ duplications_block = ""
+
+ sections = [
+ (
+ ""
+ "You are verifying that every idea/point/concept/argument/detail/url/[[wikilink]]/diagram etc. "
+ "from the provided notes chunk has been integrated into the document body."
+ " Use the patch replacements to understand what will be inserted or rewritten."
+ " Duplicate evidence is existing body text claimed to already cover notes."
+ " If duplicate evidence does not fully cover the notes text, treat the missing detail as missing."
+ ""
+ ),
+ f"\n{chunk_text}\n",
+ f"\n{replacements_block}\n",
+ f"\n{duplications_block}\n",
+ f"\n{response_instructions}\n",
+ ]
+ return "\n\n\n\n\n".join(sections)
+
+
+def format_verification_assessment(assessment: str) -> str:
+ return (
+ assessment.replace(" - Notes:", "\nNotes:")
+ .replace(" Body:", "\nBody:")
+ .replace(" Explanation:", "\nExplanation:")
+ )