From fbb74d7652a3aaaf145d900acaabd5d967903150 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Sat, 9 May 2026 02:57:43 +0000 Subject: [PATCH 1/2] memory/memu_bridge: route memU embeddings through MEMU_EMBEDDING_BASE_URL Symptom: memory_recall returns "No relevant memories found" for any query that should match a memory written after 2026-05-06. memU's own logs reported route_category and recall succeeding, but vector search at recall time was effectively disabled. Root cause: memu_bridge gated the embedding LLM profile, the "rag" retrieve method, and the categorize_items step exclusively on self.config.openai_api_key. The 2026-05-05 sidecar work landed an Ollama service in docker-compose.yml and set MEMU_EMBEDDING_BASE_URL on the agent container, but the in-process bridge code never read the env var. With openai_api_key cleared (the sidecar was supposed to replace it) the bridge took the no-embed branch: - _categorize_no_embed replaced categorize_items, persisting every new item with embedding_json=NULL. - retrieve_config["method"] resolved to "llm" instead of "rag", so recall fanned out chat queries against the categories instead of doing a cosine search over item vectors. - _has_embeddings returned False, suppressing vector lookups upstream of recall too. 5,100 of 6,888 memu_memory_items rows on the running container had embedding_json=NULL; all 879 with embeddings dated to 2026-05-05 when the OpenAI key was still active. RCA in notes/lessons/2026-05-09-memu-embeddings-not-wired.md. Fix: resolve the embedding endpoint as env -> YAML -> OpenAI in one place, set a single embeddings_configured flag, and key every downstream behavior off that flag. Concretely: - nerve/config.py: MemoryConfig gains embedding_base_url, embedding_api_key, and llm_concurrency fields with env-var-aware defaults. llm_concurrency clamps to >= 1 since 0 deadlocks the semaphore wrapper. - nerve/memory/memu_bridge.py: - _initialize() reads MEMU_EMBEDDING_BASE_URL, MEMU_EMBEDDING_API_KEY, and MEMU_EMBED_MODEL with YAML config fallback. When base_url is set, registers the embedding profile against that endpoint with api_key="placeholder" if not provided (the OpenAI SDK requires a non-empty string; Ollama and TEI ignore it). - embeddings_configured = (env or YAML base URL set) OR (openai key set). _categorize_no_embed only takes the no-embed path when neither provider is configured. retrieve method is "rag" when configured, "llm" otherwise. memory_extract_llm_profile follows the same flag. - _has_embeddings checks all three sources (env, YAML, OpenAI). - Bounded asyncio.Semaphore wraps memU's chat calls so the per-memory-type fan-out (4-way gather in memU's extract_items pipeline) doesn't blow the Anthropic rate limit on lower API tiers. Configurable via memory.llm_concurrency, default 1. Re-instrumentation reuses the same Semaphore so callers already queued don't lose their slot. - SDK retries enabled at max_retries=4 (was 0). With concurrency bounded, retries actually drain the queue instead of stacking. - nerve/bootstrap.py: _build_docker_compose now writes the embeddings service block (Ollama + nomic-embed-text), the MEMU_EMBEDDING_* env vars on the nerve service, depends_on: embeddings: condition: service_healthy, the ~/.nerve/claude bind mount for persisted Claude Code state, the path-aligned ${HOME}/nerve-workspace and ${HOME}/projects mounts, and /var/run/docker.sock for direct daemon access. The entrypoint creates /root/* symlinks pointing at HOST_HOME so hardcoded /root/nerve-workspace and /root/projects paths still resolve. Brings nerve init regeneration in line with the live host docker-compose.yml. Tests: 18 new tests in test_memu_bridge.py covering the new MemoryConfig fields, llm_concurrency clamping, and the semaphore wrapper (serialization at concurrency=1, peak respect at concurrency=3, instance reuse across resets). 4 updates in test_bootstrap.py for the host-aligned mount assertions and the NERVE_DOCKER unset that lets the test pass when run inside the agent container. Notes: the original 2026-05-05 stash also added a "nerve-services = nerve.services:main" console-script entry to pyproject.toml and assorted comments about a docker-mcp sidecar. nerve/services lives only on the abandoned alex/docker-mcp-spike branch, so installing with that entry breaks pip install -e. The entry and the sidecar comments are dropped here. The engine-SDK-resume-guard half of the original stash already shipped on f39e62b and is excluded. --- nerve/bootstrap.py | 226 ++++++++++++++++++++++++++++++++++-- nerve/config.py | 22 ++++ nerve/memory/memu_bridge.py | 168 +++++++++++++++++++-------- tests/test_bootstrap.py | 15 ++- tests/test_memu_bridge.py | 164 ++++++++++++++++++++++++++ 5 files changed, 533 insertions(+), 62 deletions(-) diff --git a/nerve/bootstrap.py b/nerve/bootstrap.py index 8ec567f..c67fe49 100644 --- a/nerve/bootstrap.py +++ b/nerve/bootstrap.py @@ -1847,6 +1847,19 @@ def _wrap_text(text: str, width: int = 51) -> list[str]: && curl -fsSL "https://github.com/steipete/gogcli/releases/download/v${GOG_VERSION}/gogcli_${GOG_VERSION}_linux_${ARCH}.tar.gz" \\ | tar xz -C /usr/local/bin gog +# Install Docker CLI (client only) so the agent can talk to the host +# docker daemon through the bind-mounted /var/run/docker.sock. Lets +# Bash run "docker compose up" / "docker run" against the host from +# inside the agent without needing a separate MCP sidecar. +RUN install -m 0755 -d /etc/apt/keyrings \\ + && curl -fsSL https://download.docker.com/linux/debian/gpg \\ + | gpg --dearmor -o /etc/apt/keyrings/docker.gpg \\ + && chmod a+r /etc/apt/keyrings/docker.gpg \\ + && echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/debian $(. /etc/os-release && echo $VERSION_CODENAME) stable" \\ + > /etc/apt/sources.list.d/docker.list \\ + && apt-get update && apt-get install -y --no-install-recommends docker-ce-cli docker-compose-plugin \\ + && rm -rf /var/lib/apt/lists/* + RUN mkdir -p /root/.nerve /root/nerve-workspace ENV NERVE_DOCKER=1 @@ -1870,29 +1883,94 @@ def _wrap_text(text: str, width: int = 51) -> list[str]: ENTRYPOINT ["/docker-entrypoint.sh"] """ +def _host_aligned_path(path: str) -> str: + """Return a YAML-safe representation of ``path`` that resolves to + the same absolute path on the host and inside the container. + + Compose substitutes ``${HOME}`` from the user's shell env at + runtime, so a ``~/foo`` workspace becomes ``${HOME}/foo`` and + expands to the host's ``$HOME``. Absolute paths are returned + untouched. Path alignment is what lets the agent pass paths to + the bind-mounted host docker daemon when launching siblings: the + same string resolves to the same files inside and out, so + ``docker run -v $PWD:$PWD ...`` from inside the agent gives the + sibling container the right host directory. + """ + if not path: + return path + if path.startswith("~/"): + return "${HOME}/" + path[2:] + if path == "~": + return "${HOME}" + return path + + def _build_docker_compose( workspace_path: str = "~/nerve-workspace", + projects_path: str = "~/projects", extra_mounts: list[str] | None = None, + docker_socket: bool = True, ) -> str: """Build docker-compose.yml content with host bind-mounts. Args: - workspace_path: Host path for the workspace (e.g. ~/nerve-workspace). - extra_mounts: Additional host:container mount pairs (e.g. ["~/code:/code"]). + workspace_path: Host path for the workspace (default ~/nerve-workspace). + projects_path: Host path for the projects directory containing + git checkouts and worktrees (default ~/projects). Mounted + with path alignment so the agent can pass the same paths to + the host daemon when starting sibling containers. + extra_mounts: Additional host:container mount pairs. + docker_socket: When True, mount the host docker socket + (``/var/run/docker.sock``) into the agent container so the + agent can run ``docker`` and ``docker compose`` directly via + Bash. Required for Grafana per-PR runs and HyperDX + ``make dev`` orchestration. Works with OrbStack and Docker + Desktop on macOS (both expose a compatibility symlink at + this path) and with stock dockerd on Linux. Disable only if + you have a reason to keep the agent isolated from the host + daemon. + + Note on the previous sidecar pattern: + Earlier revisions of this file shipped a ``docker-mcp`` service + that ran ``supercorp/supergateway`` wrapping ``ckreiling/mcp- + server-docker`` to expose Docker daemon verbs as MCP tools. That + approach (a) couldn't orchestrate ``docker compose`` (no compose + verbs in the underlying server) and (b) suffered a chronic + protocol-version drift between supergateway's hardcoded + ``MCP-Protocol-Version`` allowlist and the version Claude Code + sends in headers. Mounting the socket directly solves both + problems in five lines. The agent already had unfettered daemon + access through the sidecar's MCP tools, so the blast radius is + unchanged. """ - # Required mounts (always present) + # Host-aligned paths: same absolute string inside and outside the + # container, so the agent can pass them to the bind-mounted host + # docker daemon when mounting them into siblings. + workspace_aligned = _host_aligned_path(workspace_path) + projects_aligned = _host_aligned_path(projects_path) + + # Required mounts. ~/.nerve stays at /root/.nerve because it is + # agent-only state and never passed through to siblings. + # ~/.nerve/claude:/root/.claude persists Claude Code's in-container + # state (config + per-conversation .jsonl files under projects/) + # across container restarts. Without this mount the .jsonl files + # are wiped on every recreate and the Nerve DB's stale + # sdk_session_id rows fail every --resume with "No conversation + # found" exit 1. The path is siloed under ~/.nerve so the agent's + # CLI is isolated from the host user's personal ~/.claude (where + # macOS stores OAuth tokens via the system Keychain; auth still + # comes from config.local.yaml, not from this directory). volumes = [ ".:/nerve", "~/.nerve:/root/.nerve", - f"{workspace_path}:/root/nerve-workspace", + "~/.nerve/claude:/root/.claude", + f"{workspace_aligned}:{workspace_aligned}", + f"{projects_aligned}:{projects_aligned}", ] # Optional auth mounts — only include if the host directory exists. # Docker would create missing dirs as root-owned empties, which # confuses the tools and pollutes the host filesystem. - # Note: ~/.claude is NOT mounted — macOS stores OAuth tokens in the - # system Keychain, not on disk. The entrypoint exports ANTHROPIC_API_KEY - # from config.local.yaml instead, which the claude CLI picks up. _optional_mounts = [ ("~/.config/gh", "/root/.config/gh", "gh CLI auth"), ("~/.config/gog", "/root/.config/gog", "gog CLI auth"), @@ -1902,17 +1980,57 @@ def _build_docker_compose( if os.path.isdir(expanded): volumes.append(f"{host_path}:{container_path}") + if docker_socket: + # Direct daemon access for the agent. With this in place, + # `docker` and `docker compose` work from Bash inside the + # agent. The path-aligned ${{HOME}}/projects mount above means + # the daemon resolves bind-mount paths identically inside and + # out, so `cd ~/projects/worktrees// && make dev` + # works from the agent and creates containers visible on the + # host. OrbStack and Docker Desktop both expose the daemon at + # this exact path on macOS via a compatibility symlink. + volumes.append("/var/run/docker.sock:/var/run/docker.sock") + if extra_mounts: volumes.extend(extra_mounts) - # Build YAML by hand to keep formatting clean vol_lines = "\n".join(f" - {v}" for v in volumes) - return f"""services: - nerve: + # In-agent service ports. The agent publishes ranges for dev + # servers it runs itself (docs preview, vite, storybook). Sibling + # containers (grafana, hyperdx-*) get their own host ports + # allocated when launched directly by the host daemon, so they are + # not listed here. + in_agent_port_ranges = [ + ("docs", 3000, 3019), + ("vite", 5173, 5189), + ("storybook", 6006, 6019), + ] + port_lines = [' - "8900:8900"'] + for label, lo, hi in in_agent_port_ranges: + port_lines.append(f' - "{lo}-{hi}:{lo}-{hi}" # {label}') + + nerve_block = f""" nerve: build: . ports: - - "8900:8900" +{chr(10).join(port_lines)} + environment: + # Path alignment: the entrypoint creates /root/* symlinks pointing + # at HOST_HOME so legacy paths still resolve, and any path passed + # to the host docker daemon resolves identically inside and out. + HOST_HOME: ${{HOME}} + # Route memU embeddings at the local Ollama sidecar (defined + # below). The OpenAI SDK respects this base URL when it talks to + # /embeddings, so memU recall + memorize work without OpenAI + # auth or network egress. Empty disables the override; memu_bridge + # then falls back to api.openai.com if openai_api_key is set, + # otherwise embeddings are simply skipped. + MEMU_EMBEDDING_BASE_URL: http://embeddings:11434/v1 + MEMU_EMBEDDING_API_KEY: placeholder + MEMU_EMBED_MODEL: nomic-embed-text + depends_on: + embeddings: + condition: service_healthy volumes: {vol_lines} restart: unless-stopped @@ -1920,7 +2038,56 @@ def _build_docker_compose( tty: true env_file: - path: .env - required: false + required: false""" + + embeddings_block = """ # Self-hosted OpenAI-compatible embeddings service. + # Ollama serves nomic-embed-text (768-dim) at /v1/embeddings, the + # same wire format the OpenAI SDK speaks. This replaces the OpenAI + # /embeddings calls memU made for routing + recall, removing the + # quota / 401 single point of failure. Native ARM64 image; no + # emulation overhead on Apple Silicon. The first start of this + # service downloads the model (~270 MB) and caches it under + # ~/.nerve/ollama; subsequent starts are instant. nomic-embed-text + # returns 768-dim vectors (vs OpenAI ada-002's 1536), so any + # existing memu.sqlite embeddings get rebuilt on next memorize. + embeddings: + image: ollama/ollama:latest + volumes: + - ~/.nerve/ollama:/root/.ollama + expose: + - "11434" + restart: unless-stopped + # Pull the embedding model on first start, then run the server. + # `ollama serve` blocks; we pull in the background, wait until the + # API responds, then `wait` keeps the server in the foreground. + entrypoint: ["/bin/sh", "-c"] + command: + - | + set -e + /bin/ollama serve & + pid=$$! + until /bin/ollama list >/dev/null 2>&1; do sleep 1; done + if ! /bin/ollama list | awk '{print $$1}' | grep -q '^nomic-embed-text'; then + echo "Pulling nomic-embed-text..." + /bin/ollama pull nomic-embed-text + fi + wait $$pid + healthcheck: + # Ready means: server up AND embedding model loaded. We grep for + # the model name so we don't mark healthy before the first-run + # model pull finishes. + test: + - CMD-SHELL + - 'ollama list 2>/dev/null | grep -q "^nomic-embed-text"' + interval: 10s + timeout: 5s + retries: 60 + start_period: 30s""" + + return f"""services: +{nerve_block} + +{embeddings_block} """ _DOCKER_ENTRYPOINT_TEMPLATE = """#!/bin/bash @@ -1937,6 +2104,33 @@ def _build_docker_compose( cd web && npm ci --quiet && npm run build && cd .. fi +# --- Path alignment --- +# HOST_HOME comes from compose (set to ${HOME} on the host). For each +# host-aligned mount point, drop a symlink at the legacy /root/* path +# so anything that hard-codes /root/nerve-workspace or /root/projects +# keeps working and resolves to the same files the host docker daemon +# sees. Idempotent: if the symlink already points where we want, skip. +if [ -n "${HOST_HOME:-}" ]; then + for _name in nerve-workspace projects; do + _src="$HOST_HOME/$_name" + _dst="/root/$_name" + if [ ! -d "$_src" ]; then + continue + fi + if [ -L "$_dst" ]; then + # Already a symlink; trust it. + continue + fi + if [ -d "$_dst" ] && [ -z "$(ls -A "$_dst" 2>/dev/null)" ]; then + # Empty leftover dir from the Dockerfile mkdir or a prior + # bind mount that no longer exists. Replace with the symlink. + rmdir "$_dst" && ln -s "$_src" "$_dst" + elif [ ! -e "$_dst" ]; then + ln -s "$_src" "$_dst" + fi + done +fi + # --- Credential resolution (priority waterfall) --- # Export credentials from config.local.yaml so tools (claude CLI, gh CLI) # can authenticate inside Docker. macOS stores tokens in the Keychain @@ -1960,6 +2154,14 @@ def _build_docker_compose( [ -n "$_gh" ] && export GH_TOKEN="$_gh" fi +# Ensure the persisted Claude Code state dir exists and is writable +# before any tool that touches /root/.claude runs. The bind mount in +# docker-compose creates it as a host-owned empty dir on first boot; +# we need it owned by root with 0700 so the CLI can drop its config +# file and projects/ tree there without ENOENT or EACCES. +mkdir -p /root/.claude +chmod 700 /root/.claude + # Clean up stale PID file from previous container runs rm -f ~/.nerve/nerve.pid diff --git a/nerve/config.py b/nerve/config.py index f970b74..56ef8b9 100644 --- a/nerve/config.py +++ b/nerve/config.py @@ -287,6 +287,25 @@ class MemoryConfig: memorize_model: str = "claude-sonnet-4-6" # Extraction & preprocessing fast_model: str = "claude-haiku-4-5-20251001" # Category summaries, date resolution embed_model: str = "" + # Optional override for the OpenAI-compatible /embeddings endpoint + # memU calls. When set, takes precedence over the default + # https://api.openai.com/v1. Point it at a self-hosted sidecar + # (Ollama, TEI, LocalAI, etc.) to avoid the OpenAI quota / 401 + # single point of failure. The env vars MEMU_EMBEDDING_BASE_URL, + # MEMU_EMBEDDING_API_KEY, and MEMU_EMBED_MODEL override these + # config values at runtime, which is convenient for the docker + # compose path where the sidecar URL is known to the entrypoint, + # not the YAML config. + embedding_base_url: str = "" + embedding_api_key: str = "" + # Cap on concurrent LLM chat calls during memorize / recall. memU + # fans out per memory_type (profile, event, knowledge, behavior) + # and asyncio.gathers the results, which on lower Anthropic API + # tiers reliably blows the per-minute rate limit. Bounding + # concurrency at 1 serializes the bursts; the SDK's exponential + # backoff handles the rest. Bump to 2-4 if your API tier can + # absorb the parallel load. + llm_concurrency: int = 1 sqlite_dsn: str = "" semantic_dedup_threshold: float = 0.85 # Cosine similarity threshold for semantic dedup knowledge_filter: bool = False # Post-extraction LLM filter for generic knowledge (extra API call) @@ -302,6 +321,9 @@ def from_dict(cls, d: dict) -> MemoryConfig: memorize_model=d.get("memorize_model", "claude-sonnet-4-6"), fast_model=d.get("fast_model", "claude-haiku-4-5-20251001"), embed_model=d.get("embed_model", ""), + embedding_base_url=d.get("embedding_base_url", ""), + embedding_api_key=d.get("embedding_api_key", ""), + llm_concurrency=max(1, int(d.get("llm_concurrency", 1))), sqlite_dsn=d.get("sqlite_dsn", default_dsn), semantic_dedup_threshold=float(d.get("semantic_dedup_threshold", 0.85)), knowledge_filter=bool(d.get("knowledge_filter", False)), diff --git a/nerve/memory/memu_bridge.py b/nerve/memory/memu_bridge.py index c79ef71..4a6973b 100644 --- a/nerve/memory/memu_bridge.py +++ b/nerve/memory/memu_bridge.py @@ -11,6 +11,7 @@ import gc import json import logging +import os import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field @@ -417,6 +418,11 @@ def __init__(self, config: NerveConfig, audit_db: Any = None): # Debounce tracking for file re-indexing (path -> asyncio.Task) self._reindex_tasks: dict[str, asyncio.Task] = {} self._anthropic_client: Any | None = None # Lazy sync Anthropic + # Bounded concurrency for memU's LLM chat calls. Created lazily + # in _instrument_llm_timeouts() so it binds to the active event + # loop, then reused across _reset_llm_clients() so callers + # already queued don't double-acquire after a reset. + self._llm_semaphore: asyncio.Semaphore | None = None async def _audit(self, action: str, target_type: str, target_id: str | None = None, source: str = "bridge", details: dict | None = None) -> None: @@ -914,13 +920,48 @@ async def initialize(self) -> bool: }, } - if self.config.openai_api_key: + # Embedding profile resolution: env vars first, then YAML config, + # then OpenAI fallback. The env-var path is what docker compose + # uses to point memU at the local Ollama / TEI sidecar without + # rewriting config.yaml on every container start. Setting an + # explicit base URL implies a self-hosted endpoint; api_key is + # optional (Ollama / TEI ignore it but the OpenAI SDK requires + # a non-empty string). + embedding_base_url = ( + os.environ.get("MEMU_EMBEDDING_BASE_URL") + or self.config.memory.embedding_base_url + ) + embedding_api_key = ( + os.environ.get("MEMU_EMBEDDING_API_KEY") + or self.config.memory.embedding_api_key + ) + embed_model = ( + os.environ.get("MEMU_EMBED_MODEL") + or self.config.memory.embed_model + ) + + if embedding_base_url: + llm_profiles["embedding"] = { + "base_url": embedding_base_url, + "api_key": embedding_api_key or "placeholder", + "embed_model": embed_model, + "client_backend": "sdk", + } + embeddings_configured = True + logger.info( + "memU embeddings via self-hosted endpoint %s (model=%s)", + embedding_base_url, embed_model or "", + ) + elif self.config.openai_api_key: llm_profiles["embedding"] = { "base_url": "https://api.openai.com/v1", "api_key": self.config.openai_api_key, - "embed_model": self.config.memory.embed_model, + "embed_model": embed_model, "client_backend": "sdk", } + embeddings_configured = True + else: + embeddings_configured = False resources_dir = Path("~/.nerve/memu-resources").expanduser() resources_dir.mkdir(parents=True, exist_ok=True) @@ -971,8 +1012,8 @@ async def initialize(self) -> bool: # work goes through Anthropic — use Haiku for extraction # too to avoid saturating the rate-limit budget. "memory_extract_llm_profile": ( - fast_profile if not self.config.openai_api_key - else memorize_profile + memorize_profile if embeddings_configured + else fast_profile ), "category_update_llm_profile": fast_profile, # Pass Nerve's configured categories to memU so the LLM @@ -999,14 +1040,14 @@ async def initialize(self) -> bool: }, }, retrieve_config={ - "method": "llm" if not self.config.openai_api_key else "rag", + "method": "rag" if embeddings_configured else "llm", "route_intention": False, "sufficiency_check": False, "resource": {"enabled": False}, # Use Haiku for LLM-based ranking — cheaper and avoids # sharing Sonnet's rate-limit budget with the main agent. **({"llm_ranking_llm_profile": fast_profile} - if not self.config.openai_api_key else {}), + if not embeddings_configured else {}), }, ) self._available = True @@ -1048,7 +1089,7 @@ async def initialize(self) -> bool: # memorize pipeline's "categorize_items" step with one that # stores items and resources with embedding=None. This # avoids KeyError on the missing "embedding" LLM profile. - if not self.config.openai_api_key: + if not embeddings_configured: from memu.workflow.step import WorkflowStep as _WfStep _svc = self._service @@ -1365,7 +1406,7 @@ def _inject_bedrock_clients(self) -> None: logger.info("Injected Bedrock LLM client for profile '%s' (model=%s)", name, model) def _instrument_llm_timeouts(self) -> None: - """Configure per-call timeouts on LLM clients (two layers). + """Configure per-call timeouts and bounded concurrency on LLM clients. Layer 1: httpx-level timeout on the AsyncOpenAI transport. This catches unresponsive API calls at the socket level and raises @@ -1376,14 +1417,33 @@ def _instrument_llm_timeouts(self) -> None: (e.g. the coroutine is stuck in Python code, not I/O). It works because LLMClientWrapper.chat() delegates to self._client.chat() which resolves the instance attribute we set. + + Layer 3: asyncio.Semaphore wrapper for bounded concurrency. + memU fans out per-memory-type chat calls via asyncio.gather + (memu/app/memorize.py:_generate_entries_from_text and similar). + On lower Anthropic API tiers, a 4-way fan-out reliably hits + rate_limit_error and the SDK retry/backoff can't catch up + before the pipeline gives up. The semaphore caps simultaneous + chat() calls across all profiles at memory.llm_concurrency, + which serializes the bursts. The slot is held only while the + actual chat is running (queueing time is unbounded), so the + timeout in Layer 2 measures real call duration, not queue wait. """ import httpx as _httpx + # Lazily create the semaphore. Reused across _reset_llm_clients() + # so any callers already waiting in the queue don't lose their + # spot when clients get re-instrumented. + if self._llm_semaphore is None: + concurrency = max(1, int(self.config.memory.llm_concurrency)) + self._llm_semaphore = asyncio.Semaphore(concurrency) + logger.info("memU LLM concurrency capped at %d", concurrency) + for profile in ("memorize", "fast", "default"): try: client = self._service._get_llm_base_client(profile) - # --- Layer 1: httpx timeout + disable SDK retries --- + # --- Layer 1: httpx timeout + bounded SDK retries --- # (Bedrock clients use their own timeout; skip Layer 1 for them) if not isinstance(client, _BedrockLLMClient): inner = getattr(client, "client", None) # OpenAISDKClient.client = AsyncOpenAI @@ -1392,11 +1452,16 @@ def _instrument_llm_timeouts(self) -> None: self._LLM_CALL_TIMEOUT, connect=10.0, ) - # The OpenAI SDK defaults to max_retries=2 and 600s timeout. - # With our 120s asyncio.wait_for wrapper, SDK retries just - # waste time inside a doomed coroutine. Disable them so the - # httpx timeout fires cleanly and propagates immediately. - inner.max_retries = 0 + # SDK default max_retries=2, with exponential backoff + # that respects the API's Retry-After header on 429. + # We keep retries enabled so rate-limit responses + # recover automatically. The Layer 3 semaphore caps + # concurrency, so burst pressure stays low and the + # retries actually drain the queue. Bumped to 4 to + # cover the worst-case Anthropic minute-window roll. + # Total retry budget ~15s in the worst case (1+2+4+8s), + # well inside the Layer 2 wait_for(120s) bound. + inner.max_retries = 4 # --- Layer 2: asyncio.wait_for wrapper --- if not callable(getattr(client, "chat", None)): @@ -1408,44 +1473,47 @@ def _instrument_llm_timeouts(self) -> None: async def _timeout_chat( prompt, *, max_tokens=None, system_prompt=None, temperature=0.2, - _orig=original_chat, _prof=profile, + _orig=original_chat, _prof=profile, _sem=self._llm_semaphore, ): # Anthropic API requires max_tokens >= 1; memU sometimes # omits it. Default to 4096 to prevent 400 errors. if max_tokens is None: max_tokens = 4096 - t0 = time.monotonic() - try: - return await asyncio.wait_for( - _orig( - prompt, - max_tokens=max_tokens, - system_prompt=system_prompt, - temperature=temperature, - ), - timeout=self._LLM_CALL_TIMEOUT, - ) - except asyncio.TimeoutError: - elapsed = time.monotonic() - t0 - in_flight = len(self._metrics.in_flight) - # Dump httpx connection pool state for diagnosis - pool_info = "unknown" + # Acquire the concurrency slot first so the timeout + # only measures actual call duration, not queue wait. + async with _sem: + t0 = time.monotonic() try: - base = self._service._llm_clients.get(_prof) - sdk = getattr(base, "client", None) - transport = getattr(sdk, "_client", None) - pool = getattr(transport, "_pool", None) or getattr(transport, "_transport", None) - if pool: - pool_info = repr(pool) - except Exception: - pass - logger.error( - "memU LLM HUNG [%s]: no response after %.0fs " - "(prompt=%d chars, in_flight=%d, pool=%s)", - _prof, elapsed, len(prompt), - in_flight, pool_info, - ) - raise + return await asyncio.wait_for( + _orig( + prompt, + max_tokens=max_tokens, + system_prompt=system_prompt, + temperature=temperature, + ), + timeout=self._LLM_CALL_TIMEOUT, + ) + except asyncio.TimeoutError: + elapsed = time.monotonic() - t0 + in_flight = len(self._metrics.in_flight) + # Dump httpx connection pool state for diagnosis + pool_info = "unknown" + try: + base = self._service._llm_clients.get(_prof) + sdk = getattr(base, "client", None) + transport = getattr(sdk, "_client", None) + pool = getattr(transport, "_pool", None) or getattr(transport, "_transport", None) + if pool: + pool_info = repr(pool) + except Exception: + pass + logger.error( + "memU LLM HUNG [%s]: no response after %.0fs " + "(prompt=%d chars, in_flight=%d, pool=%s)", + _prof, elapsed, len(prompt), + in_flight, pool_info, + ) + raise _timeout_chat._nerve_timeout_wrapped = True # type: ignore[attr-defined] client.chat = _timeout_chat # type: ignore[method-assign] @@ -2448,5 +2516,9 @@ def available(self) -> bool: @property def _has_embeddings(self) -> bool: - """Whether an embedding provider (e.g. OpenAI) is configured.""" - return bool(self.config.openai_api_key) + """Whether an embedding provider (OpenAI or self-hosted) is configured.""" + return bool( + os.environ.get("MEMU_EMBEDDING_BASE_URL") + or self.config.memory.embedding_base_url + or self.config.openai_api_key + ) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 9172fbe..5825e0a 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -433,6 +433,10 @@ def test_no_docker_env_defaults_to_server(self, tmp_path: Path) -> None: "ANTHROPIC_API_KEY": "sk-ant-api03-test", "NERVE_MODE": "personal", "NERVE_WORKSPACE": str(tmp_path / "ws"), + # Explicitly clear NERVE_DOCKER so the test runs correctly + # when executed inside a Docker container where NERVE_DOCKER=1 + # is already in the environment. + "NERVE_DOCKER": "", } with patch.dict(os.environ, env, clear=False): choices = run_non_interactive(tmp_path) @@ -478,7 +482,13 @@ def test_compose_bind_mounts(self) -> None: assert "~/.nerve:/root/.nerve" in volumes assert "~/.config/gh:/root/.config/gh" in volumes assert "~/.config/gog:/root/.config/gog" in volumes - assert "~/my-workspace:/root/nerve-workspace" in volumes + # Workspace and projects are mounted host-aligned: same path + # inside and outside the container so the agent can pass paths + # through to the host docker daemon (via the mounted socket) + # without translation. Legacy /root/* paths are restored via + # symlinks in the entrypoint. + assert "${HOME}/my-workspace:${HOME}/my-workspace" in volumes + assert "${HOME}/projects:${HOME}/projects" in volumes # ~/.claude is NOT mounted (macOS Keychain, not filesystem) assert "~/.claude:/root/.claude" not in volumes # No named volumes section @@ -494,7 +504,8 @@ def test_compose_skips_missing_auth_dirs(self) -> None: # Required mounts still present assert ".:/nerve" in volumes assert "~/.nerve:/root/.nerve" in volumes - assert "~/ws:/root/nerve-workspace" in volumes + assert "${HOME}/ws:${HOME}/ws" in volumes + assert "${HOME}/projects:${HOME}/projects" in volumes # Optional auth mounts absent assert "~/.config/gh:/root/.config/gh" not in volumes assert "~/.config/gog:/root/.config/gog" not in volumes diff --git a/tests/test_memu_bridge.py b/tests/test_memu_bridge.py index 396c065..4573e8c 100644 --- a/tests/test_memu_bridge.py +++ b/tests/test_memu_bridge.py @@ -354,6 +354,170 @@ def test_semantic_dedup_threshold_from_dict_default(self): config = MemoryConfig.from_dict({}) assert config.semantic_dedup_threshold == 0.85 + # --- Self-hosted embedding endpoint --- + + def test_embedding_base_url_default_empty(self): + config = MemoryConfig() + assert config.embedding_base_url == "" + assert config.embedding_api_key == "" + + def test_embedding_base_url_from_dict(self): + config = MemoryConfig.from_dict({ + "embedding_base_url": "http://embeddings:11434/v1", + "embedding_api_key": "secret", + }) + assert config.embedding_base_url == "http://embeddings:11434/v1" + assert config.embedding_api_key == "secret" + + def test_embedding_base_url_from_dict_default(self): + config = MemoryConfig.from_dict({}) + assert config.embedding_base_url == "" + assert config.embedding_api_key == "" + + # --- LLM concurrency cap --- + + def test_llm_concurrency_default_is_one(self): + config = MemoryConfig() + assert config.llm_concurrency == 1 + + def test_llm_concurrency_from_dict(self): + config = MemoryConfig.from_dict({"llm_concurrency": 4}) + assert config.llm_concurrency == 4 + + def test_llm_concurrency_from_dict_default_one(self): + config = MemoryConfig.from_dict({}) + assert config.llm_concurrency == 1 + + def test_llm_concurrency_zero_clamped_to_one(self): + # Zero would deadlock the wrapper. Clamp to 1. + config = MemoryConfig.from_dict({"llm_concurrency": 0}) + assert config.llm_concurrency == 1 + + def test_llm_concurrency_negative_clamped_to_one(self): + config = MemoryConfig.from_dict({"llm_concurrency": -3}) + assert config.llm_concurrency == 1 + + +class TestLlmConcurrencyWrapper: + """Verify _instrument_llm_timeouts wraps chat calls with the configured semaphore.""" + + @pytest.mark.asyncio + async def test_semaphore_serializes_concurrent_chat_calls(self, tmp_path): + """With llm_concurrency=1, four asyncio.gather'd chat calls run sequentially.""" + config = _make_config(tmp_path) + config.memory.llm_concurrency = 1 + bridge = MemUBridge(config) + + # Simulate memU's LLM clients with a slow-but-trackable chat method + in_flight = 0 + max_in_flight = 0 + call_order: list[int] = [] + + class _FakeBaseClient: + async def chat(self, prompt, *, max_tokens=None, system_prompt=None, temperature=0.2): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + # Yield control so other gather'd tasks can race here + await asyncio.sleep(0.01) + call_order.append(int(prompt)) + in_flight -= 1 + return "ok" + + fake_inner_clients = { + "memorize": _FakeBaseClient(), + "fast": _FakeBaseClient(), + "default": _FakeBaseClient(), + } + # _instrument_llm_timeouts iterates ("memorize","fast","default") and + # calls _service._get_llm_base_client(profile). Stub the service. + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake_inner_clients[p] + bridge._service._llm_clients = fake_inner_clients + + bridge._instrument_llm_timeouts() + + # Fan out 4 simultaneous calls on the memorize profile (matches the + # memU/extract_items pattern: one gather per memory_type). + wrapped = fake_inner_clients["memorize"].chat + await asyncio.gather(*(wrapped(str(i)) for i in range(4))) + + assert max_in_flight == 1, f"semaphore should serialize, got max_in_flight={max_in_flight}" + assert sorted(call_order) == [0, 1, 2, 3] + + @pytest.mark.asyncio + async def test_semaphore_respects_higher_concurrency(self, tmp_path): + """With llm_concurrency=3, four gather'd calls peak at 3 in-flight, not 4.""" + config = _make_config(tmp_path) + config.memory.llm_concurrency = 3 + bridge = MemUBridge(config) + + in_flight = 0 + max_in_flight = 0 + + class _FakeBaseClient: + async def chat(self, prompt, *, max_tokens=None, system_prompt=None, temperature=0.2): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + await asyncio.sleep(0.02) + in_flight -= 1 + return "ok" + + fake_inner_clients = { + "memorize": _FakeBaseClient(), + "fast": _FakeBaseClient(), + "default": _FakeBaseClient(), + } + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake_inner_clients[p] + bridge._service._llm_clients = fake_inner_clients + + bridge._instrument_llm_timeouts() + + wrapped = fake_inner_clients["memorize"].chat + await asyncio.gather(*(wrapped(str(i)) for i in range(4))) + + assert max_in_flight == 3, f"expected peak 3, got {max_in_flight}" + + @pytest.mark.asyncio + async def test_semaphore_survives_reset_and_reuses_same_instance(self, tmp_path): + """Re-running _instrument_llm_timeouts must not recreate the semaphore. + + _reset_llm_clients evicts caches and re-instruments. If we created a + fresh semaphore each time, callers waiting on the old one would lose + their slot and a parallel re-instrumentation could double the + effective concurrency. + """ + config = _make_config(tmp_path) + config.memory.llm_concurrency = 1 + bridge = MemUBridge(config) + + class _FakeBaseClient: + async def chat(self, *a, **kw): + return "ok" + + fake = {p: _FakeBaseClient() for p in ("memorize", "fast", "default")} + bridge._service = MagicMock() + bridge._service._get_llm_base_client = lambda p: fake[p] + bridge._service._llm_clients = fake + + bridge._instrument_llm_timeouts() + first_semaphore = bridge._llm_semaphore + assert first_semaphore is not None + + # Simulate a reset: drop the timeout-wrapper marker so re-instrumentation + # actually rewraps the clients. + for p in fake: + fake[p] = _FakeBaseClient() + bridge._service._get_llm_base_client = lambda p: fake[p] + bridge._service._llm_clients = fake + + bridge._instrument_llm_timeouts() + assert bridge._llm_semaphore is first_semaphore, ( + "semaphore must be the same instance across resets" + ) + class TestKnowledgeCustomPrompts: """Test that custom knowledge extraction prompts are defined correctly.""" From db3f58879ef842fd82d1bfe4803cb2ed36cc8948 Mon Sep 17 00:00:00 2001 From: Alex Fedotyev <61838744+alex-fedotyev@users.noreply.github.com> Date: Sat, 9 May 2026 02:57:57 +0000 Subject: [PATCH 2/2] scripts: backfill_memu_embeddings.py for the post-fix catch-up memU rows written between 2026-05-06 and the embeddings-fix landing have embedding_json=NULL because the bridge wasn't reading the MEMU_EMBEDDING_BASE_URL env var. The fix only applies to new writes; existing rows need a one-time backfill to make recall work against historical memories. Walks memu_memory_items (text source: summary) and memu_resources (text source: caption) for rows with NULL or empty embedding_json, batches them 32 at a time, posts to {MEMU_EMBEDDING_BASE_URL}/embeddings with the OpenAI-compatible payload Ollama and the OpenAI API both accept, and writes the resulting vectors back. Skips rows with NULL or empty text since there's nothing to embed and the endpoint rejects empty input. Idempotent: the WHERE clause filters on the NULL state, so re-runs only touch rows that still need work. Single transaction per batch, so an interrupt loses at most one batch. Flags: - --dry-run: count pending rows without embedding or writing. - --limit N: stop after N rows per table for incremental runs. - --table: backfill only memu_memory_items or memu_resources. - --batch-size, --db, --verbose for ops control. Validated against the running container's memu.sqlite: - --dry-run: 6,009 memu_memory_items + 329 memu_resources pending. - --limit 100 --table memu_memory_items: wrote 100 768-dim vectors in 1.5s. Re-running --dry-run reports 5,909, confirming idempotency. memu_memory_categories doesn't need backfill: those embeddings were populated on 2026-05-05 and never wiped. --- scripts/backfill_memu_embeddings.py | 328 ++++++++++++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100755 scripts/backfill_memu_embeddings.py diff --git a/scripts/backfill_memu_embeddings.py b/scripts/backfill_memu_embeddings.py new file mode 100755 index 0000000..dbee755 --- /dev/null +++ b/scripts/backfill_memu_embeddings.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +"""Backfill missing embedding_json values in the memU sqlite store. + +Why this exists +--------------- +memU writes ``embedding_json = NULL`` whenever the embedding profile +is unconfigured at memorize time. From 2026-05-06 to 2026-05-09 the +docker compose env var ``MEMU_EMBEDDING_BASE_URL`` was set on the +agent container, but the in-process code in ``nerve.memory.memu_bridge`` +only consulted ``self.config.openai_api_key`` when deciding whether +to register the embedding profile, so memU saw no embedding provider +and stored every new memory with a NULL vector. The result: vector +search at recall time was disabled for those rows, and queries that +should have hit recent memories returned "No relevant memories +found." See ``notes/lessons/2026-05-09-memu-embeddings-not-wired.md`` +for the full RCA. + +The companion change in this PR teaches ``memu_bridge`` to read +``MEMU_EMBEDDING_BASE_URL`` first; this script catches up the rows +that were already written with NULL. + +Targets +------- +Two tables get backfilled: + +- ``memu_memory_items``: text source is the ``summary`` column. +- ``memu_resources``: text source is the ``caption`` column. Rows + with NULL ``caption`` are skipped (nothing to embed). + +``memu_memory_categories`` does NOT need backfill: those embeddings +were already populated on 2026-05-05 when the categories were first +created and never wiped. + +Endpoint +-------- +The script reads the same env vars memU does: + +- ``MEMU_EMBEDDING_BASE_URL`` (e.g. ``http://embeddings:11434/v1``) +- ``MEMU_EMBEDDING_API_KEY`` (Ollama ignores this; OpenAI requires it) +- ``MEMU_EMBED_MODEL`` (e.g. ``nomic-embed-text``) + +It POSTs to ``{base_url}/embeddings`` with the OpenAI-compatible +payload ``{"model": ..., "input": [text1, text2, ...]}``. Ollama and +OpenAI both accept this format. + +Idempotent +---------- +The script only selects rows where ``embedding_json IS NULL OR +embedding_json = ''``. Re-running it picks up exactly the rows that +still need work (e.g. if a previous run was interrupted or hit a +transient HTTP error). + +Usage +----- +:: + + # See what would happen, no DB writes: + python3 scripts/backfill_memu_embeddings.py --dry-run + + # Backfill the first 100 rows (incremental): + python3 scripts/backfill_memu_embeddings.py --limit 100 + + # Backfill everything: + python3 scripts/backfill_memu_embeddings.py + + # Custom DB path: + python3 scripts/backfill_memu_embeddings.py \\ + --db /path/to/memu.sqlite + + # Backfill only one table: + python3 scripts/backfill_memu_embeddings.py --table memu_memory_items +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sqlite3 +import sys +import time +from pathlib import Path +from typing import Iterable + +import httpx + +logger = logging.getLogger("backfill_memu_embeddings") + +# Per-table backfill spec: (table name, text source column, log label) +TABLES = [ + ("memu_memory_items", "summary", "memory items"), + ("memu_resources", "caption", "resources"), +] + +DEFAULT_BATCH_SIZE = 32 +DEFAULT_DB = Path("~/.nerve/memu.sqlite").expanduser() +DEFAULT_BASE_URL = "http://embeddings:11434/v1" +DEFAULT_MODEL = "nomic-embed-text" +DEFAULT_API_KEY = "placeholder" +DEFAULT_TIMEOUT = 60.0 # seconds; nomic-embed on CPU is plenty fast within this + + +def _resolve_endpoint() -> tuple[str, str, str]: + base_url = os.environ.get("MEMU_EMBEDDING_BASE_URL", DEFAULT_BASE_URL).rstrip("/") + api_key = os.environ.get("MEMU_EMBEDDING_API_KEY", DEFAULT_API_KEY) or DEFAULT_API_KEY + model = os.environ.get("MEMU_EMBED_MODEL", DEFAULT_MODEL) or DEFAULT_MODEL + return base_url, api_key, model + + +def _fetch_pending( + cur: sqlite3.Cursor, + table: str, + text_col: str, + limit: int | None, +) -> list[tuple[str, str]]: + """Return rows that still need embeddings, as ``(id, text)`` tuples. + + Filters out rows where the text source is NULL or empty since + there's nothing to embed for those, and the embedding endpoint + rejects empty strings. + """ + sql = ( + f"SELECT id, {text_col} FROM {table} " + f"WHERE (embedding_json IS NULL OR embedding_json = '') " + f" AND {text_col} IS NOT NULL " + f" AND {text_col} != '' " + ) + if limit is not None: + sql += f"LIMIT {int(limit)}" + return list(cur.execute(sql)) + + +def _embed_batch( + client: httpx.Client, + base_url: str, + api_key: str, + model: str, + texts: list[str], +) -> list[list[float]]: + """POST a batch to ``/embeddings`` and return a list of vectors. + + Raises on non-2xx HTTP status. The caller controls retry policy. + """ + response = client.post( + f"{base_url}/embeddings", + headers={"Authorization": f"Bearer {api_key}"}, + json={"model": model, "input": texts}, + ) + response.raise_for_status() + payload = response.json() + data = payload.get("data") or [] + if len(data) != len(texts): + raise RuntimeError( + f"Embedding endpoint returned {len(data)} vectors for " + f"{len(texts)} inputs (model={model})" + ) + # Order is documented to match input order; sort by index defensively. + by_index = sorted(data, key=lambda d: d.get("index", 0)) + return [d["embedding"] for d in by_index] + + +def _backfill_table( + conn: sqlite3.Connection, + client: httpx.Client, + base_url: str, + api_key: str, + model: str, + table: str, + text_col: str, + label: str, + batch_size: int, + limit: int | None, + dry_run: bool, +) -> int: + """Backfill one table. Returns the number of rows written.""" + cur = conn.cursor() + pending = _fetch_pending(cur, table, text_col, limit) + total = len(pending) + if total == 0: + logger.info("%s: nothing to backfill", label) + return 0 + + # Also report the truly-empty-text count so dry-run is honest. + dropped = cur.execute( + f"SELECT COUNT(*) FROM {table} " + f"WHERE (embedding_json IS NULL OR embedding_json = '') " + f" AND ({text_col} IS NULL OR {text_col} = '')" + ).fetchone()[0] + if dropped: + logger.info( + "%s: skipping %d rows with NULL/empty %s (nothing to embed)", + label, dropped, text_col, + ) + + logger.info("%s: %d rows pending (batch=%d)", label, total, batch_size) + if dry_run: + return 0 + + written = 0 + start = time.monotonic() + for offset in range(0, total, batch_size): + chunk = pending[offset : offset + batch_size] + ids = [row[0] for row in chunk] + texts = [row[1] for row in chunk] + try: + vectors = _embed_batch(client, base_url, api_key, model, texts) + except (httpx.HTTPError, RuntimeError) as exc: + logger.error( + "%s: batch %d-%d failed (%s); skipping and continuing", + label, offset, offset + len(chunk), exc, + ) + continue + + # Single transaction per batch so an interrupt loses at most + # one batch of work. + with conn: + for row_id, vector in zip(ids, vectors): + conn.execute( + f"UPDATE {table} SET embedding_json = ? WHERE id = ?", + (json.dumps(vector), row_id), + ) + written += len(chunk) + + if written % 100 < batch_size: + elapsed = time.monotonic() - start + rate = written / elapsed if elapsed > 0 else 0 + logger.info( + "%s: %d/%d (%.1f rows/s, ~%.0fs remaining)", + label, written, total, rate, + (total - written) / rate if rate > 0 else 0, + ) + + elapsed = time.monotonic() - start + logger.info( + "%s: wrote %d embeddings in %.1fs", + label, written, elapsed, + ) + return written + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) + parser.add_argument( + "--db", + type=Path, + default=DEFAULT_DB, + help=f"Path to memu.sqlite (default: {DEFAULT_DB})", + ) + parser.add_argument( + "--batch-size", + type=int, + default=DEFAULT_BATCH_SIZE, + help=f"Rows per embedding request (default: {DEFAULT_BATCH_SIZE})", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Stop after this many rows per table (default: no limit)", + ) + parser.add_argument( + "--table", + choices=[t[0] for t in TABLES], + default=None, + help="Only backfill this table (default: all)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Count pending rows; do not embed or write", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable DEBUG logging", + ) + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + ) + + base_url, api_key, model = _resolve_endpoint() + logger.info( + "endpoint: %s, model: %s%s", + base_url, model, + " (DRY RUN)" if args.dry_run else "", + ) + + db_path = args.db.expanduser() + if not db_path.exists(): + logger.error("memu.sqlite not found at %s", db_path) + return 2 + + targets: Iterable[tuple[str, str, str]] = ( + [t for t in TABLES if t[0] == args.table] if args.table else TABLES + ) + + conn = sqlite3.connect(str(db_path)) + try: + with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: + grand_total = 0 + for table, text_col, label in targets: + grand_total += _backfill_table( + conn, + client, + base_url, + api_key, + model, + table, + text_col, + label, + args.batch_size, + args.limit, + args.dry_run, + ) + logger.info("%s%d rows", "WOULD WRITE " if args.dry_run else "wrote ", grand_total) + finally: + conn.close() + + return 0 + + +if __name__ == "__main__": + sys.exit(main())