From 0609c67b4fd8b15707d161ffba67f759c3507ca5 Mon Sep 17 00:00:00 2001 From: Josh Mabry Date: Mon, 27 Apr 2026 18:30:56 -0700 Subject: [PATCH 1/6] feat: ship pluggable scheduler (local sqlite + Workstacean adapter) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a default scheduler so agents can defer work to themselves — "remind me tomorrow", recurring sweeps, deadline check-ins. Three new tools land in get_all_tools() when a backend is wired up: schedule_task, list_schedules, cancel_schedule. Two backends ship behind a single SchedulerBackend protocol: - LocalScheduler (default): sqlite + asyncio polling. Per-agent jobs.db at /sandbox/scheduler// with a ~/.protoagent/scheduler// fallback. Fires by POSTing message/send to the running agent's own /a2a endpoint, going through bearer + X-API-Key auth like a real caller (audit log + cost-v1 capture work the same). Cron expressions reschedule via croniter; ISO datetimes are one-shot. Missed-fire recovery: within 24h fires immediately, older fires roll forward without firing. - WorkstaceanScheduler: HTTP adapter to a Workstacean install's POST /publish. Activated automatically when WORKSTACEAN_API_BASE and WORKSTACEAN_API_KEY env vars are set. Topic and job IDs are namespaced cron.. so a single Workstacean can serve N protoAgent forks safely. Multi-agent isolation is the headline architectural property — spinning up gina-personal alongside gina-work on the same box (or sharing one Workstacean) won't cross-fire scheduled prompts. Verified with explicit tests in test_scheduler_local.py. Wiring: - scheduler/{__init__,interface,local,workstacean}.py — module - tools/lg_tools.py — _build_scheduler_tools factory; get_all_tools takes a new optional scheduler= kwarg - graph/agent.py — create_agent_graph and create_simple_agent accept scheduler= - server.py — _build_scheduler() picks backend at boot, on_event("startup"/"shutdown") drives the polling task lifecycle, reload path reuses the running scheduler instance - config/langgraph-config.yaml + graph/{config,subagents/config}.py — worker subagent gets the three new tools in its allowlist - requirements.txt — croniter>=2.0 Tests: 48 new (test_scheduler_local.py covers add/list/cancel, multi-agent isolation, reschedule-vs-delete, missed-fire recovery, and an end-to-end fire path with httpx mocked; test_scheduler_workstacean.py covers all the publish payload assertions, namespacing, custom topic prefix, and HTTP error handling). Docs: docs/guides/scheduler.md (Diataxis how-to with the firing model, multi-agent story, env reference, and notes on the Workstacean A2A-bridge gap), plus index/configuration/README/ TEMPLATE updates. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 1 + TEMPLATE.md | 22 ++ config/langgraph-config.yaml | 3 + docs/guides/index.md | 1 + docs/guides/scheduler.md | 171 +++++++++++++ docs/reference/configuration.md | 13 + graph/agent.py | 7 +- graph/config.py | 1 + graph/subagents/config.py | 1 + requirements.txt | 4 + scheduler/__init__.py | 27 ++ scheduler/interface.py | 114 +++++++++ scheduler/local.py | 381 ++++++++++++++++++++++++++++ scheduler/workstacean.py | 183 +++++++++++++ server.py | 115 ++++++++- tests/test_scheduler_local.py | 290 +++++++++++++++++++++ tests/test_scheduler_workstacean.py | 168 ++++++++++++ tools/lg_tools.py | 115 ++++++++- 18 files changed, 1606 insertions(+), 11 deletions(-) create mode 100644 docs/guides/scheduler.md create mode 100644 scheduler/__init__.py create mode 100644 scheduler/interface.py create mode 100644 scheduler/local.py create mode 100644 scheduler/workstacean.py create mode 100644 tests/test_scheduler_local.py create mode 100644 tests/test_scheduler_workstacean.py diff --git a/README.md b/README.md index ef54b84..4a7036f 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ rename / release-pipeline wiring. | Subagents | `graph/subagents/config.py` | DeerFlow-pattern delegation via a `task()` tool; one placeholder `worker` ships | | Starter tools | `tools/lg_tools.py` | Keyless general tools (`current_time`, `calculator` safe AST eval, `web_search` via DuckDuckGo, `fetch_url`) plus memory tools (`memory_ingest`, `memory_recall`, `memory_list`, `memory_stats`, `daily_log`) bound to the bundled store | | Knowledge store | `knowledge/store.py` | sqlite + FTS5 (LIKE fallback). One `chunks` table for operator notes, daily-log entries, and conversation findings. Default-on; turn off with `middleware.knowledge: false` | +| Scheduler | `scheduler/` | `schedule_task` / `list_schedules` / `cancel_schedule` tools backed by either a bundled sqlite scheduler or a Workstacean adapter (env-selected). Multi-agent-safe — every job is namespaced by `AGENT_NAME`. See [Schedule future work](./docs/guides/scheduler.md) | | Eval harness | `evals/` | Side-effect-verified A2A test harness — audit log + reply text + KB state. `python -m evals.runner` against a running agent. See [Eval your fork](./docs/guides/evals.md) | | Tracing | `tracing.py` | Langfuse trace_session with distributed `a2a.trace` propagation and the OTel cross-context-detach filter | | Observability | `metrics.py`, `audit.py` | Prometheus metrics with per-agent prefix, JSONL audit log with trace IDs | diff --git a/TEMPLATE.md b/TEMPLATE.md index 08ff5b3..4408c90 100644 --- a/TEMPLATE.md +++ b/TEMPLATE.md @@ -180,6 +180,28 @@ See [Eval your fork](./docs/guides/evals.md) for what each case asserts, how the three assertion channels work, and how to add cases for your fork's new tools. +## 9b. Scheduler — local sqlite or Workstacean + +The bundled scheduler ships three agent tools — `schedule_task`, +`list_schedules`, `cancel_schedule` — backed by either a local +sqlite poller or a Workstacean adapter, selected at startup via env: + +```bash +# Default: local sqlite, persists at /sandbox/scheduler//jobs.db +python server.py + +# Workstacean: set both and restart +export WORKSTACEAN_API_BASE=http://your-workstacean:3000 +export WORKSTACEAN_API_KEY=... +python server.py +``` + +Multi-fork safety: every job is namespaced by `AGENT_NAME`, so +spinning up `gina-personal` next to `gina-work` (or any number of +ginas under one Workstacean) doesn't cross-fire prompts. See +[Schedule future work](./docs/guides/scheduler.md) for the full +firing model and integration notes. + ## 9a. Understand the skill loop protoAgent's skill loop lets your agent learn from experience automatically. diff --git a/config/langgraph-config.yaml b/config/langgraph-config.yaml index 05bada2..c75ff71 100644 --- a/config/langgraph-config.yaml +++ b/config/langgraph-config.yaml @@ -32,6 +32,9 @@ subagents: - memory_list - memory_stats - daily_log + - schedule_task + - list_schedules + - cancel_schedule max_turns: 20 middleware: diff --git a/docs/guides/index.md b/docs/guides/index.md index 65dc41e..ce26b48 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -10,4 +10,5 @@ Task-oriented procedures. Assumes you already have a running agent (see [Tutoria | [Configure subagents](/guides/subagents) | You want specialized delegates beyond the placeholder `worker` | | [Wire Langfuse + Prometheus](/guides/observability) | You need traces and metrics in production | | [Eval your fork](/guides/evals) | You want a baseline pass-rate for the tools / memory / A2A surface in your fork | +| [Schedule future work](/guides/scheduler) | You want the agent to defer tasks to itself ("remind me tomorrow", recurring sweeps) — local sqlite or Workstacean-backed | | [Deploy via GHCR](/guides/deploy) | You're ready to ship and want auto-deploy wired up | diff --git a/docs/guides/scheduler.md b/docs/guides/scheduler.md new file mode 100644 index 0000000..dceb994 --- /dev/null +++ b/docs/guides/scheduler.md @@ -0,0 +1,171 @@ +# Schedule future work + +protoAgent ships a scheduler so the agent can defer tasks to itself — +"remind me about X tomorrow", "every Monday morning summarize last +week's logs", "at 3pm check the deploy". Two backends ship by default; +the agent-facing tool surface is identical regardless of which one is +active. + +## When to read this + +- You want forks (or your own multiple ginas) to support reminders, + recurring sweeps, or any "do this later" intent. +- You're running protoWorkstacean and want scheduled fires to flow + through the existing bus. +- You're spinning up multiple protoAgent instances on one box and + need scheduling state to stay isolated per agent. + +## The three tools + +When the scheduler is active, three tools land in `get_all_tools()`: + +| Tool | What it does | +|---|---| +| `schedule_task(prompt, when, job_id?)` | Persist a future invocation. `when` is cron (`"0 9 * * *"`) or ISO-8601 (`"2026-05-01T15:00:00"`). | +| `list_schedules()` | Show all jobs visible to *this* agent. | +| `cancel_schedule(job_id)` | Remove a job by id. | + +Prompts are self-contained — the agent has no memory of the +scheduling moment when the task fires, so write the prompt as a fresh +turn ("review last week's pipeline incidents and post a summary", +not "do that thing we discussed"). + +## Backend selection + +`server.py::_build_scheduler` picks at startup: + +1. `WORKSTACEAN_API_BASE` + `WORKSTACEAN_API_KEY` set → **`WorkstaceanScheduler`**. +2. Otherwise → **`LocalScheduler`** (sqlite, asyncio polling). +3. `SCHEDULER_DISABLED=1` → no scheduler. The three tools don't ship. + +Both backends honor the same `SchedulerBackend` protocol; the agent +loop never knows which one is wired up. + +```bash +# Solo / local dev — falls through to LocalScheduler automatically. +python server.py + +# Workstacean install — set both env vars and restart. +export WORKSTACEAN_API_BASE=http://your-workstacean-host:3000 +export WORKSTACEAN_API_KEY= +python server.py +``` + +> **protoLabs operators**: the fleet's Workstacean lives on the +> `ava` node; `WORKSTACEAN_API_KEY` is in the org's secrets manager +> under `secret-management → workstacean`. Coordinate with the team +> for the exact URL. + +## Multi-agent isolation + +Every job is namespaced by `AGENT_NAME` so spinning up +`gina-personal` alongside `gina-work` on the same box doesn't +cross-fire prompts. + +| Backend | How it isolates | +|---|---| +| Local | DB path per agent: `/sandbox/scheduler//jobs.db` (falls back to `~/.protoagent/scheduler//jobs.db`). Every row also carries `agent_name`; reads filter on it. | +| Workstacean | Job IDs are prefixed `-...`; topics are namespaced `cron..`. One Workstacean install can serve N forks safely. | + +If you supply your own `job_id` in `schedule_task`: + +- Local: the id is stored as-is. Two agents sharing one DB path with + the same user-supplied id will trip a primary-key collision (the + second add raises a clear error). To avoid it, let the scheduler + auto-generate (the auto-id is `-`). +- Workstacean: the adapter prepends `-` if your id doesn't + already start with it, so cross-agent collisions are impossible. + +## Local backend — how firing works + +The local scheduler runs an asyncio polling task on FastAPI's +`startup` event. Once a second: + +1. Read jobs where `next_fire <= now()` and `enabled = 1`. +2. For each due job: POST to `http://127.0.0.1:/a2a` as + a `message/send` with the job's prompt as the message text. Bearer + + X-API-Key are forwarded automatically. +3. One-shot ISO jobs are deleted after firing. Cron jobs reschedule + forward via `croniter`. + +Going through HTTP rather than calling into the graph directly buys +parity with real callers — the audit log, cost-v1 capture, and +push-notification path all behave identically. + +### Missed-fire recovery + +On startup, jobs whose `next_fire` is in the past are inspected: + +- **Within the last 24h** — fire on the next tick (so a 5-minute + outage doesn't lose an upcoming reminder). +- **Older than 24h** — cron jobs roll forward to the next slot + without firing; one-shot jobs are dropped. This matches + Workstacean's recovery behaviour and avoids flooding the agent + with stale prompts after a long downtime. + +### Persistence path + +```bash +# Default (Docker) +/sandbox/scheduler//jobs.db + +# Local fallback (when /sandbox isn't writable) +~/.protoagent/scheduler//jobs.db + +# Override +export SCHEDULER_DB_DIR=/var/data/agents +# → /var/data/agents//jobs.db +``` + +Mount a volume at the configured path to survive container +restarts (analogous to `audit/` and `knowledge/`). + +## Workstacean backend — how firing works + +When `WORKSTACEAN_API_BASE` and `WORKSTACEAN_API_KEY` are set, the +adapter publishes to `POST {base}/publish` with topic +`command.schedule` and the action wrapper Workstacean expects. See +the [Workstacean scheduler reference](https://protolabsai.github.io/protoWorkstacean/reference/scheduler/) +for the payload shape. + +When the schedule fires, Workstacean publishes the inner payload to +`cron..`. **Workstacean does not natively dispatch +to A2A endpoints today** — your fork needs to wire a bridge that +subscribes to `cron..*` and POSTs to the protoAgent's +`/a2a` endpoint. + +### Topic prefix override + +If your existing Workstacean bus uses a different convention: + +```bash +export WORKSTACEAN_TOPIC_PREFIX="myorg.cron.gina" +# → topics fire on myorg.cron.gina. +``` + +### `list_schedules()` returns empty under Workstacean + +Workstacean's `list` action publishes its response on the +`schedule.list` topic — there's no synchronous reply on `/publish`. +The adapter intentionally doesn't subscribe. If you need live +introspection, query Workstacean directly or run the local backend. + +## Adding a case to your eval suite + +The default `evals/tasks.json` doesn't include scheduler cases (the +fire path is async — a single eval run can't easily test that the +scheduled prompt arrives). For forks that want it, the pattern is: + +1. `schedule_task(prompt, "")` in setup. +2. Wait > 1 second. +3. Assert on the audit log and/or KB state for the *fired* prompt's + side effects. + +Document the case as `category: "scheduler"` and gate at >= 2/3 +attempts to absorb timing jitter. + +## References + +- [Workstacean scheduler reference](https://protolabsai.github.io/protoWorkstacean/reference/scheduler/) +- [Configuration](/reference/configuration#scheduler) — env vars +- [Eval your fork](/guides/evals) — for the testing pattern above diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index bd3f5be..3ca9a9c 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -83,3 +83,16 @@ Only read when `middleware.knowledge` is `true`. | `top_k` | `5` | Results per query fed into state. | The bundled store is sqlite + FTS5 (with an automatic LIKE fallback when FTS5 isn't available). One `chunks` table; the `domain` column distinguishes operator-set notes (`memory_ingest`), daily-log entries (`daily_log`), and conversation findings extracted by `MemoryMiddleware` (`domain='finding'`). + +## Scheduler + +The bundled scheduler is configured entirely via environment, not YAML, so the same image can be deployed under either backend without rebuilding. See [Schedule future work](/guides/scheduler) for the full guide. + +| Env var | Default | What | +|---|---|---| +| `WORKSTACEAN_API_BASE` | unset | When set together with `WORKSTACEAN_API_KEY`, swaps the bundled local scheduler for the `WorkstaceanScheduler` HTTP adapter. | +| `WORKSTACEAN_API_KEY` | unset | Auth token sent as `X-API-Key` to Workstacean's `/publish`. | +| `WORKSTACEAN_TOPIC_PREFIX` | `cron.` | Override the bus topic the adapter fires on, when your Workstacean install uses a different convention. | +| `SCHEDULER_DB_DIR` | `/sandbox/scheduler` | Local backend: parent directory for `/jobs.db`. Falls back to `~/.protoagent/scheduler//jobs.db` when unwritable. | +| `SCHEDULER_INVOKE_URL` | `http://127.0.0.1:` | Local backend: where to POST `message/send` when a job fires. Override only if the agent's A2A endpoint isn't on localhost. | +| `SCHEDULER_DISABLED` | unset | Set to `1` / `true` to drop the scheduler tools entirely. | diff --git a/graph/agent.py b/graph/agent.py index 355c3fc..08ad32a 100644 --- a/graph/agent.py +++ b/graph/agent.py @@ -158,6 +158,7 @@ async def task( def create_agent_graph( config: LangGraphConfig, knowledge_store=None, + scheduler=None, include_subagents: bool = True, ): """Create the protoAgent LangGraph agent. @@ -167,7 +168,7 @@ def create_agent_graph( """ llm = create_llm(config) - all_tools = get_all_tools(knowledge_store) + all_tools = get_all_tools(knowledge_store, scheduler=scheduler) if include_subagents: task_tool = _build_task_tool(config, all_tools) @@ -189,12 +190,12 @@ def create_agent_graph( return agent -def create_simple_agent(config: LangGraphConfig, knowledge_store=None): +def create_simple_agent(config: LangGraphConfig, knowledge_store=None, scheduler=None): """Create a simple agent without subagents (for debugging/testing).""" from langgraph.prebuilt import create_react_agent llm = create_llm(config) - all_tools = get_all_tools(knowledge_store) + all_tools = get_all_tools(knowledge_store, scheduler=scheduler) system_prompt = build_system_prompt(include_subagents=False) diff --git a/graph/config.py b/graph/config.py index a3df02b..c2cf995 100644 --- a/graph/config.py +++ b/graph/config.py @@ -41,6 +41,7 @@ class LangGraphConfig: "current_time", "calculator", "web_search", "fetch_url", "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", + "schedule_task", "list_schedules", "cancel_schedule", ], max_turns=20, )) diff --git a/graph/subagents/config.py b/graph/subagents/config.py index 560edc7..a488703 100644 --- a/graph/subagents/config.py +++ b/graph/subagents/config.py @@ -67,6 +67,7 @@ class SubagentConfig: "current_time", "calculator", "web_search", "fetch_url", "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", + "schedule_task", "list_schedules", "cancel_schedule", ], max_turns=20, ) diff --git a/requirements.txt b/requirements.txt index 30ef46d..aa05284 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,7 @@ langchain-openai>=0.3.0 # Starter tools (tools/lg_tools.py) ddgs>=9.0 beautifulsoup4>=4.12 + +# Scheduler (scheduler/local.py — cron expression parsing for the +# bundled local backend; the Workstacean adapter doesn't need this) +croniter>=2.0 diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 0000000..960a226 --- /dev/null +++ b/scheduler/__init__.py @@ -0,0 +1,27 @@ +"""Pluggable scheduler for future-task delivery. + +Two backends ship by default: + +- ``LocalScheduler`` — sqlite + asyncio. Bundled, zero external + dependencies, per-agent persistence path. Use this for solo forks + or any deployment that doesn't already run protoWorkstacean. +- ``WorkstaceanScheduler`` — HTTP adapter to a protoWorkstacean + install. Topic-namespaced per agent so multiple ginas can share one + Workstacean and not collide. + +``server.py`` selects the backend at startup based on env vars; the +agent loop sees the same three tools (``schedule_task``, +``list_schedules``, ``cancel_schedule``) regardless of which backend +is wired up. + +Multi-agent safety: every job carries an ``agent_name`` (defaulted +from ``AGENT_NAME`` env / config) so that two protoAgent instances +sharing one storage path or one Workstacean install can't accidentally +fire each other's scheduled prompts. +""" + +from scheduler.interface import Job, SchedulerBackend +from scheduler.local import LocalScheduler +from scheduler.workstacean import WorkstaceanScheduler + +__all__ = ["Job", "SchedulerBackend", "LocalScheduler", "WorkstaceanScheduler"] diff --git a/scheduler/interface.py b/scheduler/interface.py new file mode 100644 index 0000000..6de9b3a --- /dev/null +++ b/scheduler/interface.py @@ -0,0 +1,114 @@ +"""Scheduler protocol — the contract every backend honors. + +Both ``LocalScheduler`` and ``WorkstaceanScheduler`` implement this +shape. The agent-facing tools in ``tools/lg_tools.py`` only see the +protocol; swapping backends is a server.py-level decision. +""" + +from __future__ import annotations + +import re +from dataclasses import asdict, dataclass, field +from datetime import UTC, datetime +from typing import Any, Protocol + + +@dataclass +class Job: + """A scheduled future invocation. + + ``schedule`` is either a 5-field cron expression (e.g. + ``"0 9 * * 1-5"``) or an ISO-8601 datetime for one-shot fires + (e.g. ``"2026-05-01T15:00:00+00:00"``). Backends auto-detect. + + ``agent_name`` namespaces the job — one Workstacean install or + shared sqlite path can serve N protoAgent instances without + cross-firing. + """ + + id: str + prompt: str + schedule: str + agent_name: str + created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + next_fire: str | None = None # ISO; None means "compute on save" + last_fire: str | None = None + enabled: bool = True + + def as_dict(self) -> dict[str, Any]: + return asdict(self) + + +class SchedulerBackend(Protocol): + """The minimum surface every backend implements. + + Methods are sync because the agent tools wrap them in their own + async functions; backends that need to do async I/O (httpx in + Workstacean's case) handle it internally. + """ + + name: str # short label for logs / agent-facing strings: "local", "workstacean" + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + """Persist a new job. Returns the stored ``Job`` (with + backend-assigned id and next_fire if the caller didn't set them). + + Raises ``ValueError`` for malformed schedule strings.""" + ... + + def cancel_job(self, job_id: str) -> bool: + """Remove a job. Returns ``True`` if a row was deleted.""" + ... + + def list_jobs(self) -> list[Job]: + """All jobs visible to the calling agent. Implementations are + responsible for filtering by ``agent_name`` so multi-agent + deployments stay isolated.""" + ... + + async def start(self) -> None: + """Start any background polling. No-op for backends that don't + need it (Workstacean dispatches and forgets).""" + ... + + async def stop(self) -> None: + """Cleanly shut down background work.""" + ... + + +# ── shared helpers ────────────────────────────────────────────────────────── + + +_CRON_PATTERN = re.compile(r"^\s*\S+\s+\S+\s+\S+\s+\S+\s+\S+\s*$") + + +def is_cron(schedule: str) -> bool: + """Heuristic: does ``schedule`` look like a 5-field cron expression? + + Used by both backends to decide between cron-iter and + ``datetime.fromisoformat``. Doesn't validate semantics — that + happens when the schedule is parsed. + """ + return bool(_CRON_PATTERN.match(schedule)) and not _looks_like_iso(schedule) + + +def _looks_like_iso(schedule: str) -> bool: + # ISO datetimes contain ``-`` and either ``T`` or a space between + # date and time. Cron has neither in the first field. + return "T" in schedule or _has_iso_date_prefix(schedule) + + +def _has_iso_date_prefix(schedule: str) -> bool: + head = schedule.strip().split(" ", 1)[0] + return bool(re.match(r"^\d{4}-\d{2}-\d{2}", head)) + + +def parse_iso_to_utc(schedule: str) -> datetime: + """Parse an ISO-8601 datetime, treating naive inputs as UTC. + + Raises ``ValueError`` for malformed strings. + """ + dt = datetime.fromisoformat(schedule) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) diff --git a/scheduler/local.py b/scheduler/local.py new file mode 100644 index 0000000..17fe9b0 --- /dev/null +++ b/scheduler/local.py @@ -0,0 +1,381 @@ +"""LocalScheduler — bundled sqlite + asyncio backend. + +The default scheduler when no protoWorkstacean install is configured. +Every protoAgent instance gets a private ``jobs.db`` namespaced by +``AGENT_NAME`` so spinning up gina-personal alongside gina-work +doesn't cross-fire prompts. + +Architecture: + +- One ``jobs`` table — ``id``, ``prompt``, ``schedule``, ``next_fire``, + ``agent_name``, ``last_fire``, ``enabled``, ``created_at``. +- Polling coroutine runs on FastAPI's startup hook (``server.py``) + and ticks once per ``_POLL_INTERVAL_S`` (1s default). Cheap because + sqlite reads with an indexed ``next_fire`` filter cost microseconds. +- Firing = HTTP POST to the running agent's own ``/a2a`` endpoint as + a ``message/send``. Going through HTTP rather than calling into the + graph directly gets us free parity with real callers — same audit + log, same cost-v1 capture, same auth path. +- One-shot ISO schedules are deleted after firing. Cron schedules + reschedule via croniter. +- On startup: any job whose ``next_fire`` is in the past but within a + 24h window fires immediately (BFCL-style "missed fires" recovery, + matching Workstacean's behaviour). Older missed fires are + rescheduled forward without firing — better than waking the agent + to a flood of stale prompts after a long downtime. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sqlite3 +import uuid +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + +from croniter import croniter + +from scheduler.interface import Job, is_cron, parse_iso_to_utc + +log = logging.getLogger(__name__) + +DEFAULT_DB_DIR = "/sandbox/scheduler" +_POLL_INTERVAL_S = 1.0 +_MISSED_FIRE_WINDOW_S = 24 * 60 * 60 # 24h — matches Workstacean + + +def _resolve_db_path(db_dir: str | Path | None, agent_name: str) -> Path: + """Pick a writable jobs.db path namespaced by agent name.""" + raw = os.environ.get("SCHEDULER_DB_DIR") or db_dir or DEFAULT_DB_DIR + base = Path(str(raw)).expanduser() / agent_name + try: + base.mkdir(parents=True, exist_ok=True) + probe = base / ".write-probe" + probe.touch() + probe.unlink() + return base / "jobs.db" + except OSError: + fallback = Path.home() / ".protoagent" / "scheduler" / agent_name + fallback.mkdir(parents=True, exist_ok=True) + log.info("[scheduler] %s not writable; using %s instead", base, fallback) + return fallback / "jobs.db" + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat() + + +def _compute_next_fire(schedule: str, *, after: datetime | None = None) -> str: + """Resolve a schedule string to the next ISO timestamp it fires. + + ``after`` controls when "next" starts — current time by default; + pass an explicit reference when rescheduling a cron job after a + fire so successive fires don't drift. + """ + after = after or datetime.now(UTC) + if is_cron(schedule): + return croniter(schedule, after).get_next(datetime).astimezone(UTC).isoformat() + return parse_iso_to_utc(schedule).isoformat() + + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + prompt TEXT NOT NULL, + schedule TEXT NOT NULL, + agent_name TEXT NOT NULL, + next_fire TEXT NOT NULL, + last_fire TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_jobs_next_fire ON jobs(next_fire); +CREATE INDEX IF NOT EXISTS idx_jobs_agent_name ON jobs(agent_name); +""" + + +class LocalScheduler: + """Sqlite-backed scheduler with an asyncio polling loop. + + Construct once at server startup, ``await scheduler.start()`` to + spawn the polling task, ``await scheduler.stop()`` on shutdown. + The agent-facing tools call ``add_job`` / ``cancel_job`` / + ``list_jobs`` synchronously. + """ + + name = "local" + + def __init__( + self, + agent_name: str, + *, + invoke_url: str, + api_key: str | None = None, + bearer_token: str | None = None, + db_dir: str | Path | None = None, + ): + self.agent_name = agent_name + self._invoke_url = invoke_url.rstrip("/") + self._api_key = api_key or "" + self._bearer = bearer_token or "" + self.path = _resolve_db_path(db_dir, agent_name) + self._task: asyncio.Task | None = None + self._stopping = False + self._init_db() + + # ── DB plumbing ───────────────────────────────────────────────────────── + + def _connect(self) -> sqlite3.Connection: + db = sqlite3.connect(str(self.path)) + db.row_factory = sqlite3.Row + try: + db.execute("PRAGMA journal_mode=WAL") + except sqlite3.OperationalError as exc: + log.debug("[scheduler] WAL skipped: %s", exc) + return db + + def _init_db(self) -> None: + try: + db = self._connect() + db.executescript(_SCHEMA) + db.commit() + db.close() + except sqlite3.DatabaseError as exc: + log.error("[scheduler] schema init failed at %s: %s", self.path, exc) + + # ── public API (matches SchedulerBackend) ─────────────────────────────── + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + if not prompt or not prompt.strip(): + raise ValueError("scheduler: prompt is required") + next_fire = _compute_next_fire(schedule) # raises ValueError for malformed input + + job = Job( + id=job_id or self._generate_id(), + prompt=prompt, + schedule=schedule, + agent_name=self.agent_name, + next_fire=next_fire, + ) + db = self._connect() + try: + db.execute( + "INSERT INTO jobs (id, prompt, schedule, agent_name, next_fire, " + "last_fire, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (job.id, job.prompt, job.schedule, job.agent_name, + job.next_fire, job.last_fire, int(job.enabled), job.created_at), + ) + db.commit() + except sqlite3.IntegrityError as exc: + raise ValueError(f"job id {job.id!r} already exists") from exc + finally: + db.close() + return job + + def cancel_job(self, job_id: str) -> bool: + db = self._connect() + try: + cur = db.execute( + "DELETE FROM jobs WHERE id = ? AND agent_name = ?", + (job_id, self.agent_name), + ) + db.commit() + return cur.rowcount > 0 + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] cancel_job failed: %s", exc) + return False + finally: + db.close() + + def list_jobs(self) -> list[Job]: + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? ORDER BY next_fire ASC", + (self.agent_name,), + ).fetchall() + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] list_jobs failed: %s", exc) + return [] + finally: + db.close() + return [_row_to_job(r) for r in rows] + + async def start(self) -> None: + if self._task is not None: + return + self._stopping = False + self._recover_missed_fires() + self._task = asyncio.create_task(self._poll_loop(), name="scheduler.local.poll") + log.info( + "[scheduler] local backend started: agent=%s db=%s", + self.agent_name, self.path, + ) + + async def stop(self) -> None: + self._stopping = True + if self._task is None: + return + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): # noqa: BLE001 + pass + self._task = None + log.info("[scheduler] local backend stopped") + + # ── polling + firing ──────────────────────────────────────────────────── + + async def _poll_loop(self) -> None: + while not self._stopping: + try: + await self._tick() + except Exception: # noqa: BLE001 + log.exception("[scheduler] poll tick failed") + try: + await asyncio.sleep(_POLL_INTERVAL_S) + except asyncio.CancelledError: + return + + async def _tick(self) -> None: + now = datetime.now(UTC) + due = self._claim_due_jobs(now) + for job in due: + try: + await self._fire(job) + finally: + self._reschedule_or_delete(job, fired_at=now) + + def _claim_due_jobs(self, now: datetime) -> list[Job]: + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? AND enabled = 1 " + "AND next_fire <= ? ORDER BY next_fire ASC", + (self.agent_name, now.isoformat()), + ).fetchall() + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] _claim_due_jobs failed: %s", exc) + return [] + finally: + db.close() + return [_row_to_job(r) for r in rows] + + def _reschedule_or_delete(self, job: Job, *, fired_at: datetime) -> None: + """Cron jobs roll forward; one-shot jobs are deleted.""" + db = self._connect() + try: + if is_cron(job.schedule): + next_iso = _compute_next_fire(job.schedule, after=fired_at) + db.execute( + "UPDATE jobs SET next_fire = ?, last_fire = ? WHERE id = ?", + (next_iso, fired_at.isoformat(), job.id), + ) + else: + db.execute("DELETE FROM jobs WHERE id = ?", (job.id,)) + db.commit() + except sqlite3.DatabaseError: + log.exception("[scheduler] reschedule failed for job %s", job.id) + finally: + db.close() + + def _recover_missed_fires(self) -> None: + """Roll past-due jobs forward on startup. + + - Missed fires within the last 24h fire immediately on the next + tick (we leave their ``next_fire`` in the past so the polling + loop picks them up naturally). + - Older missed fires are rescheduled forward without firing — + firing a flood of stale prompts after a long downtime is worse + than dropping them. + """ + cutoff_recent = datetime.now(UTC) - timedelta(seconds=_MISSED_FIRE_WINDOW_S) + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? AND enabled = 1 " + "AND next_fire <= ?", + (self.agent_name, cutoff_recent.isoformat()), + ).fetchall() + for row in rows: + job = _row_to_job(row) + if is_cron(job.schedule): + next_iso = _compute_next_fire(job.schedule) + db.execute( + "UPDATE jobs SET next_fire = ? WHERE id = ?", + (next_iso, job.id), + ) + log.info( + "[scheduler] dropped stale fire for job %s; next at %s", + job.id, next_iso, + ) + else: + db.execute("DELETE FROM jobs WHERE id = ?", (job.id,)) + log.info("[scheduler] dropped stale one-shot job %s", job.id) + db.commit() + except sqlite3.DatabaseError: + log.exception("[scheduler] missed-fire recovery failed") + finally: + db.close() + + async def _fire(self, job: Job) -> None: + """Deliver a job by POSTing to the agent's own A2A endpoint.""" + import httpx + + headers = {"Content-Type": "application/json"} + if self._bearer: + headers["Authorization"] = f"Bearer {self._bearer}" + if self._api_key: + headers["X-API-Key"] = self._api_key + + message_id = str(uuid.uuid4()) + body = { + "jsonrpc": "2.0", + "id": message_id, + "method": "message/send", + "params": { + "message": { + "role": "user", + "parts": [{"kind": "text", "text": job.prompt}], + "messageId": message_id, + # Carry the originating job id so observers can tell + # this turn was scheduler-driven, not user-driven. + "metadata": {"scheduler_job_id": job.id, "scheduler_kind": "local"}, + } + }, + } + try: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post(f"{self._invoke_url}/a2a", headers=headers, json=body) + if r.status_code >= 400: + log.error( + "[scheduler] fire failed for job %s: HTTP %d %s", + job.id, r.status_code, r.text[:200], + ) + else: + log.info("[scheduler] fired job %s", job.id) + except Exception: # noqa: BLE001 + log.exception("[scheduler] fire exception for job %s", job.id) + + def _generate_id(self) -> str: + # Agent-name prefix keeps cross-agent IDs distinct in shared + # observability surfaces (audit log, dashboards) even though + # the DB row is already namespaced by agent_name. + return f"{self.agent_name}-{uuid.uuid4().hex[:12]}" + + +def _row_to_job(row: Any) -> Job: + return Job( + id=row["id"], + prompt=row["prompt"], + schedule=row["schedule"], + agent_name=row["agent_name"], + next_fire=row["next_fire"], + last_fire=row["last_fire"], + enabled=bool(row["enabled"]), + created_at=row["created_at"], + ) diff --git a/scheduler/workstacean.py b/scheduler/workstacean.py new file mode 100644 index 0000000..97f690b --- /dev/null +++ b/scheduler/workstacean.py @@ -0,0 +1,183 @@ +"""WorkstaceanScheduler — HTTP adapter to a protoWorkstacean install. + +Activated automatically when ``WORKSTACEAN_API_BASE`` and +``WORKSTACEAN_API_KEY`` are set (see ``server.py``). + +Speaks Workstacean's ``POST /publish`` API as documented at +https://protolabsai.github.io/protoWorkstacean/reference/scheduler/. +Every job is namespaced with the agent's name so multiple protoAgent +forks (e.g. ``gina-personal`` + ``gina-work``) can share one +Workstacean install without cross-firing: + +- Job IDs are prefixed: ``{agent_name}-{user_id_or_uuid}`` +- Topics are namespaced: ``cron.{agent_name}`` + +The adapter is fire-and-forget — Workstacean owns scheduling state. +``list_jobs()`` issues a ``list`` command and waits for the response +on the ``schedule.list`` topic. If the user wants strict local +introspection, they should run the local backend. + +Note: Workstacean today does not natively dispatch to A2A endpoints; +forks need to wire their Workstacean install to route ``cron.*`` +topics to the agent's A2A endpoint. See the linked guide for the +recommended bridge config. +""" + +from __future__ import annotations + +import logging +import os +import uuid +from typing import Any + +import httpx + +from scheduler.interface import Job, parse_iso_to_utc, is_cron + +log = logging.getLogger(__name__) + +DEFAULT_TIMEOUT_S = 10 + + +class WorkstaceanScheduler: + """HTTP adapter to a Workstacean ``/publish`` endpoint.""" + + name = "workstacean" + + def __init__( + self, + agent_name: str, + *, + base_url: str, + api_key: str, + topic_prefix: str | None = None, + timeout_s: float = DEFAULT_TIMEOUT_S, + ): + if not base_url: + raise ValueError("WorkstaceanScheduler: base_url is required") + if not api_key: + raise ValueError("WorkstaceanScheduler: api_key is required") + self.agent_name = agent_name + self._base_url = base_url.rstrip("/") + self._api_key = api_key + # Namespacing: topic_prefix governs which Workstacean topic the + # job fires on. Default = ``cron.``. Forks can override + # via ``WORKSTACEAN_TOPIC_PREFIX`` to integrate with existing + # bus conventions. + self._topic_prefix = topic_prefix or f"cron.{agent_name}" + self._timeout_s = timeout_s + + # ── public API ────────────────────────────────────────────────────────── + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + if not prompt or not prompt.strip(): + raise ValueError("scheduler: prompt is required") + # Validate the schedule eagerly so a malformed expr fails at + # tool-call time, not silently inside Workstacean. + _validate_schedule(schedule) + + normalized_id = self._namespaced_id(job_id) + topic = f"{self._topic_prefix}.{normalized_id}" + # Workstacean expects an outer ``command.schedule`` topic and + # the inner ``payload`` carries both the trigger schedule and + # the actual message that will be fired. The inner ``topic`` + # is what Workstacean publishes to when the schedule fires — + # so it has to be something a downstream A2A bridge subscribes + # to. Default convention: ``cron..``. + body = { + "topic": "command.schedule", + "payload": { + "action": "add", + "id": normalized_id, + "schedule": schedule, + "topic": topic, + "payload": { + "content": prompt, + "sender": "scheduler", + "channel": "a2a", + # Cross-system breadcrumb so the bridge knows which + # protoAgent fork the message belongs to. + "agent_name": self.agent_name, + "scheduler_job_id": normalized_id, + }, + }, + } + self._publish(body) + + return Job( + id=normalized_id, + prompt=prompt, + schedule=schedule, + agent_name=self.agent_name, + next_fire=None, # Workstacean owns the schedule state + ) + + def cancel_job(self, job_id: str) -> bool: + body = { + "topic": "command.schedule", + "payload": {"action": "remove", "id": self._namespaced_id(job_id)}, + } + try: + self._publish(body) + return True + except RuntimeError as exc: + log.warning("[scheduler] workstacean cancel failed: %s", exc) + return False + + def list_jobs(self) -> list[Job]: + """Returns ``[]`` from the adapter. + + Workstacean's ``list`` action publishes its response on the + ``schedule.list`` topic — there is no synchronous reply on + ``/publish``. Subscribing to that topic from inside a + protoAgent process (without a full bus client) is more + machinery than this adapter is the right layer for. Forks + that need live introspection should run the local backend or + query Workstacean directly. + """ + return [] + + async def start(self) -> None: + # Workstacean owns scheduling state — nothing to start here. + log.info( + "[scheduler] workstacean backend ready: agent=%s base=%s topic=%s.*", + self.agent_name, self._base_url, self._topic_prefix, + ) + + async def stop(self) -> None: + return None + + # ── helpers ───────────────────────────────────────────────────────────── + + def _publish(self, body: dict[str, Any]) -> None: + headers = {"Content-Type": "application/json", "X-API-Key": self._api_key} + try: + r = httpx.post( + f"{self._base_url}/publish", + headers=headers, + json=body, + timeout=self._timeout_s, + ) + except httpx.HTTPError as exc: + raise RuntimeError(f"workstacean publish failed: {exc}") from exc + if r.status_code >= 400: + raise RuntimeError( + f"workstacean publish HTTP {r.status_code}: {r.text[:200]}" + ) + + def _namespaced_id(self, job_id: str | None) -> str: + suffix = job_id or uuid.uuid4().hex[:12] + prefix = f"{self.agent_name}-" + return suffix if suffix.startswith(prefix) else prefix + suffix + + +def _validate_schedule(schedule: str) -> None: + """Validate cron expression OR ISO datetime. Raises ValueError.""" + if is_cron(schedule): + from croniter import croniter + try: + croniter(schedule) + except (TypeError, ValueError) as exc: + raise ValueError(f"invalid cron expression {schedule!r}: {exc}") from exc + return + parse_iso_to_utc(schedule) # raises ValueError on malformed ISO diff --git a/server.py b/server.py index f1ecd46..bf05ae0 100644 --- a/server.py +++ b/server.py @@ -58,6 +58,10 @@ _active_port = 7870 # populated by _main() — the port this process is actually bound to. # Read by the autostart installer so the LaunchAgent reboots # on the same port the operator launched with, not the default. +_scheduler = None # SchedulerBackend (LocalScheduler or WorkstaceanScheduler). + # Constructed at init, started on FastAPI startup, stopped + # on shutdown. Lifecycle is hooked in _main() so the + # polling coroutine doesn't leak on server reload. def _init_langgraph_agent(): @@ -97,11 +101,21 @@ def _init_langgraph_agent(): # the worker subagent — the store is still cheap to construct. knowledge_store = _build_knowledge_store(_graph_config) - _graph = create_agent_graph(_graph_config, knowledge_store=knowledge_store) + # Scheduler — local sqlite by default, swaps to a WorkstaceanScheduler + # automatically when WORKSTACEAN_API_BASE + WORKSTACEAN_API_KEY env + # vars are set. Both backends share the same agent-tool surface + # (schedule_task / list_schedules / cancel_schedule). + global _scheduler + _scheduler = _build_scheduler(_graph_config) + + _graph = create_agent_graph( + _graph_config, knowledge_store=knowledge_store, scheduler=_scheduler, + ) log.info( - "LangGraph agent initialized (model: %s, knowledge_db: %s)", + "LangGraph agent initialized (model: %s, knowledge_db: %s, scheduler: %s)", _graph_config.model_name, getattr(knowledge_store, "path", "(disabled)"), + getattr(_scheduler, "name", "disabled"), ) @@ -124,6 +138,69 @@ def _build_knowledge_store(config): return None +def _build_scheduler(config): + """Return the active scheduler backend, or ``None`` when disabled. + + Selection order: + + 1. ``WORKSTACEAN_API_BASE`` + ``WORKSTACEAN_API_KEY`` set → + ``WorkstaceanScheduler``. Forks running on the protoLabs fleet + infrastructure get this for free. + 2. Otherwise → ``LocalScheduler`` with sqlite at + ``/sandbox/scheduler//jobs.db``. + + Returns ``None`` when explicitly disabled via ``SCHEDULER_DISABLED=1`` + so a fork can ship without a scheduler at all. + + The agent's auth token + api-key are passed into the local backend + so its self-invocation HTTP call can pass through bearer / X-API-Key + auth — the scheduler hits the same A2A endpoint as a real caller. + """ + if os.environ.get("SCHEDULER_DISABLED", "").lower() in ("1", "true", "yes"): + log.info("[server] scheduler disabled via SCHEDULER_DISABLED env") + return None + + name = agent_name() + workstacean_base = os.environ.get("WORKSTACEAN_API_BASE", "").strip() + workstacean_key = os.environ.get("WORKSTACEAN_API_KEY", "").strip() + if workstacean_base and workstacean_key: + try: + from scheduler import WorkstaceanScheduler + return WorkstaceanScheduler( + agent_name=name, + base_url=workstacean_base, + api_key=workstacean_key, + topic_prefix=os.environ.get("WORKSTACEAN_TOPIC_PREFIX") or None, + ) + except Exception as exc: + log.warning( + "[server] WorkstaceanScheduler init failed: %s; falling back to local", + exc, + ) + + try: + from scheduler import LocalScheduler + invoke_url = os.environ.get( + "SCHEDULER_INVOKE_URL", + f"http://127.0.0.1:{_active_port}", + ) + bearer = (config.auth_token or os.environ.get("A2A_AUTH_TOKEN", "")).strip() + api_key_env = f"{name.upper()}_API_KEY" + api_key = os.environ.get(api_key_env, "").strip() + return LocalScheduler( + agent_name=name, + invoke_url=invoke_url, + api_key=api_key, + bearer_token=bearer, + ) + except Exception as exc: + log.warning( + "[server] LocalScheduler init failed: %s; running scheduler-less", + exc, + ) + return None + + def _reload_langgraph_agent() -> tuple[bool, str]: """Rebuild the compiled graph from the latest config YAML. @@ -161,7 +238,14 @@ def _reload_langgraph_agent() -> tuple[bool, str]: if is_setup_complete(): try: new_store = _build_knowledge_store(new_config) - new_graph = create_agent_graph(new_config, knowledge_store=new_store) + # Re-use the running scheduler instance — tearing down the + # polling loop on every drawer save would orphan in-flight + # fires. Env-driven scheduler config (WORKSTACEAN_API_BASE, + # SCHEDULER_DISABLED) only takes effect on full restart; + # the YAML doesn't carry scheduler settings yet. + new_graph = create_agent_graph( + new_config, knowledge_store=new_store, scheduler=_scheduler, + ) except Exception as e: log.exception("[reload] graph rebuild failed") return False, f"graph rebuild failed: {e}" @@ -757,6 +841,31 @@ def _main(): fastapi_app = FastAPI(title=f"{agent_name()} — protoAgent") + # --- Scheduler lifecycle ------------------------------------------------ + # The local scheduler needs an asyncio polling task; the Workstacean + # adapter is a no-op start/stop. Both implement the same contract so + # we just call through. on_event is preferred over a lifespan + # context manager here — the rest of the boot is sync (uvicorn.run + # is the only blocking call) and FastAPI fires startup/shutdown + # around it. + @fastapi_app.on_event("startup") + async def _scheduler_startup(): + if _scheduler is None: + return + try: + await _scheduler.start() + except Exception: + log.exception("[scheduler] startup failed") + + @fastapi_app.on_event("shutdown") + async def _scheduler_shutdown(): + if _scheduler is None: + return + try: + await _scheduler.stop() + except Exception: + log.exception("[scheduler] shutdown failed") + # --- Chat API ----------------------------------------------------------- class ChatRequest(PydanticBaseModel): message: str diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py new file mode 100644 index 0000000..524116e --- /dev/null +++ b/tests/test_scheduler_local.py @@ -0,0 +1,290 @@ +"""Tests for ``scheduler.local.LocalScheduler``. + +The polling-loop firing path is covered by stubbing ``httpx.AsyncClient`` +so a unit test doesn't need a running A2A endpoint. Multi-agent +isolation, missed-fire recovery, and reschedule-vs-delete behaviour +all get explicit cases — they're the parts most likely to regress. +""" + +from __future__ import annotations + +import asyncio +import sqlite3 +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import pytest + +from scheduler.interface import is_cron, parse_iso_to_utc +from scheduler.local import LocalScheduler, _compute_next_fire + + +# ── helpers ───────────────────────────────────────────────────────────────── + + +def _make_scheduler(tmp_path: Path, agent: str = "gina-test") -> LocalScheduler: + return LocalScheduler( + agent_name=agent, + invoke_url="http://127.0.0.1:7870", + api_key="k", + bearer_token="b", + db_dir=tmp_path, + ) + + +# ── interface helpers ────────────────────────────────────────────────────── + + +class TestIsCron: + def test_cron_5_field(self): + assert is_cron("0 9 * * *") is True + + def test_cron_with_ranges(self): + assert is_cron("0 9 * * 1-5") is True + + def test_iso_with_t(self): + assert is_cron("2026-04-28T15:00:00") is False + + def test_iso_with_space(self): + assert is_cron("2026-04-28 15:00:00") is False + + def test_iso_with_offset(self): + assert is_cron("2026-04-28T15:00:00+00:00") is False + + def test_garbage(self): + assert is_cron("not a schedule") is False + assert is_cron("0 9 *") is False # 3 fields, not 5 + + def test_seven_fields_rejected(self): + # 7-field cron (with seconds + year) is not standard 5-field; + # the current detector accepts only exactly 5. + assert is_cron("0 0 12 * * MON 2026") is False + + +class TestParseIso: + def test_naive_treated_as_utc(self): + dt = parse_iso_to_utc("2026-04-28T15:00:00") + assert dt.tzinfo == UTC + assert dt.hour == 15 + + def test_offset_normalized(self): + dt = parse_iso_to_utc("2026-04-28T15:00:00-05:00") + assert dt.tzinfo == UTC + assert dt.hour == 20 # 15 EST → 20 UTC + + def test_malformed_raises(self): + with pytest.raises(ValueError): + parse_iso_to_utc("not an iso string") + + +# ── add / list / cancel ───────────────────────────────────────────────────── + + +class TestAddJob: + def test_cron_job(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *") + assert job.agent_name == "gina-test" + assert job.prompt == "hi" + assert job.next_fire is not None + assert "T" in job.next_fire # ISO + + def test_iso_one_shot(self, tmp_path): + s = _make_scheduler(tmp_path) + future = "2099-01-01T00:00:00" + job = s.add_job("hi", future) + # Naive ISO should be normalized to UTC + assert job.next_fire.startswith("2099-01-01T00:00:00") + + def test_empty_prompt_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + with pytest.raises(ValueError): + s.add_job(" ", "0 9 * * *") + + def test_malformed_schedule_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + with pytest.raises(ValueError): + s.add_job("hi", "not-a-real-schedule") + + def test_user_id_preserved(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *", job_id="my-custom-id") + assert job.id == "my-custom-id" + + def test_duplicate_id_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="dup") + with pytest.raises(ValueError, match="already exists"): + s.add_job("again", "0 9 * * *", job_id="dup") + + def test_auto_id_has_agent_prefix(self, tmp_path): + s = _make_scheduler(tmp_path, agent="ginavision") + job = s.add_job("hi", "0 9 * * *") + assert job.id.startswith("ginavision-") + + +class TestListAndCancel: + def test_list_filters_by_agent(self, tmp_path): + gp = _make_scheduler(tmp_path, agent="gina-personal") + gw = _make_scheduler(tmp_path, agent="gina-work") + gp.add_job("p1", "0 9 * * *") + gp.add_job("p2", "0 10 * * *") + gw.add_job("w1", "0 9 * * *") + assert len(gp.list_jobs()) == 2 + assert len(gw.list_jobs()) == 1 + assert gp.list_jobs()[0].agent_name == "gina-personal" + + def test_cancel_returns_true_on_hit(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *") + assert s.cancel_job(job.id) is True + assert s.list_jobs() == [] + + def test_cancel_returns_false_on_miss(self, tmp_path): + s = _make_scheduler(tmp_path) + assert s.cancel_job("does-not-exist") is False + + def test_cross_agent_cancel_blocked(self, tmp_path): + gp = _make_scheduler(tmp_path, agent="gina-personal") + gw = _make_scheduler(tmp_path, agent="gina-work") + gw_job = gw.add_job("w1", "0 9 * * *") + # gp tries to cancel gw's job — must fail silently (no row deleted) + assert gp.cancel_job(gw_job.id) is False + assert len(gw.list_jobs()) == 1 + + +# ── reschedule / delete behaviour ─────────────────────────────────────────── + + +class TestRescheduleOrDelete: + def test_one_shot_deleted_after_fire(self, tmp_path): + s = _make_scheduler(tmp_path) + # ISO in the past so _claim_due_jobs picks it up + past = (datetime.now(UTC) - timedelta(seconds=5)).isoformat() + s.add_job("hi", past, job_id="oneshot") + job = s.list_jobs()[0] + s._reschedule_or_delete(job, fired_at=datetime.now(UTC)) + assert s.list_jobs() == [] + + def test_cron_rescheduled_after_fire(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="cron") + job = s.list_jobs()[0] + original_next = job.next_fire + # Fire at "now" — next_fire should advance to the next 09:00 UTC + s._reschedule_or_delete(job, fired_at=datetime.now(UTC)) + new_next = s.list_jobs()[0].next_fire + assert new_next != original_next or original_next > datetime.now(UTC).isoformat() + # last_fire should be populated + assert s.list_jobs()[0].last_fire is not None + + +class TestMissedFireRecovery: + def test_stale_oneshot_dropped(self, tmp_path): + s = _make_scheduler(tmp_path) + # ISO from 2 days ago — outside the 24h window + stale = (datetime.now(UTC) - timedelta(days=2)).isoformat() + s.add_job("hi", stale, job_id="stale") + s._recover_missed_fires() + assert s.list_jobs() == [] + + def test_stale_cron_rolled_forward(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="cron-stale") + # Manually rewrite next_fire to 2 days ago (outside window) + db = sqlite3.connect(str(s.path)) + old = (datetime.now(UTC) - timedelta(days=2)).isoformat() + db.execute("UPDATE jobs SET next_fire = ? WHERE id = ?", (old, "cron-stale")) + db.commit() + db.close() + s._recover_missed_fires() + rolled = s.list_jobs()[0] + assert rolled.next_fire > datetime.now(UTC).isoformat() + + def test_recent_missed_fire_kept(self, tmp_path): + s = _make_scheduler(tmp_path) + # 5 minutes ago — inside the 24h window, should still fire + recent = (datetime.now(UTC) - timedelta(minutes=5)).isoformat() + s.add_job("hi", recent, job_id="recent") + s._recover_missed_fires() + # Job still exists with next_fire in the past — polling will fire it + jobs = s.list_jobs() + assert len(jobs) == 1 + assert jobs[0].next_fire < datetime.now(UTC).isoformat() + + +# ── compute_next_fire ─────────────────────────────────────────────────────── + + +class TestComputeNextFire: + def test_cron_returns_iso_utc(self): + result = _compute_next_fire("0 9 * * *") + # Parses cleanly as ISO + dt = datetime.fromisoformat(result) + assert dt.tzinfo is not None + + def test_cron_after_anchor(self): + anchor = datetime(2026, 4, 27, 8, 0, 0, tzinfo=UTC) + result = _compute_next_fire("0 9 * * *", after=anchor) + # 9am UTC on 2026-04-27 + dt = datetime.fromisoformat(result) + assert dt.year == 2026 and dt.month == 4 and dt.day == 27 and dt.hour == 9 + + def test_iso_passthrough(self): + result = _compute_next_fire("2026-12-25T00:00:00") + assert result.startswith("2026-12-25T00:00:00") + + +# ── start / stop loop ─────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_start_stop_idempotent(tmp_path): + s = _make_scheduler(tmp_path) + await s.start() + await s.start() # second call is a no-op, not an error + assert s._task is not None + await s.stop() + await s.stop() # second call is a no-op, not an error + assert s._task is None + + +@pytest.mark.asyncio +async def test_due_job_fires(tmp_path, monkeypatch): + """End-to-end: an ISO job in the past gets picked up and POSTs to /a2a.""" + s = _make_scheduler(tmp_path) + # Schedule for 1 second ago so the first tick claims it + past = (datetime.now(UTC) - timedelta(seconds=1)).isoformat() + s.add_job("FIRED-ME", past, job_id="firetest") + + fired: list[dict] = [] + + class _FakeResponse: + status_code = 200 + text = "ok" + + class _FakeClient: + def __init__(self, *_a, **_kw): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, url, headers=None, json=None): + fired.append({"url": url, "json": json}) + return _FakeResponse() + + import httpx + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + + await s.start() + # Give the polling loop one tick (poll interval is 1s) + await asyncio.sleep(1.5) + await s.stop() + + assert any("FIRED-ME" in str(c["json"]) for c in fired) + # One-shot was deleted after firing + assert s.list_jobs() == [] diff --git a/tests/test_scheduler_workstacean.py b/tests/test_scheduler_workstacean.py new file mode 100644 index 0000000..ddd6da4 --- /dev/null +++ b/tests/test_scheduler_workstacean.py @@ -0,0 +1,168 @@ +"""Tests for ``scheduler.workstacean.WorkstaceanScheduler``. + +We don't run a Workstacean instance — instead we monkeypatch +``httpx.post`` and assert that the adapter sends the right +``POST /publish`` body shape (action, namespaced id, namespaced topic, +auth header). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from scheduler.workstacean import WorkstaceanScheduler + + +class _FakeResponse: + def __init__(self, status: int = 200, body: str = "ok"): + self.status_code = status + self.text = body + + +class _Recorder: + def __init__(self): + self.calls: list[dict[str, Any]] = [] + self.response = _FakeResponse() + + def __call__(self, url, headers=None, json=None, timeout=None): + self.calls.append({"url": url, "headers": headers, "json": json}) + return self.response + + +@pytest.fixture +def recorder(monkeypatch): + rec = _Recorder() + import httpx + monkeypatch.setattr(httpx, "post", rec) + return rec + + +@pytest.fixture +def adapter(): + return WorkstaceanScheduler( + agent_name="gina-personal", + base_url="http://workstacean:3000", + api_key="test-key", + ) + + +# ── construction guards ──────────────────────────────────────────────────── + + +def test_missing_base_url_rejected(): + with pytest.raises(ValueError, match="base_url"): + WorkstaceanScheduler(agent_name="x", base_url="", api_key="k") + + +def test_missing_api_key_rejected(): + with pytest.raises(ValueError, match="api_key"): + WorkstaceanScheduler(agent_name="x", base_url="http://w:3000", api_key="") + + +# ── add_job ──────────────────────────────────────────────────────────────── + + +class TestAddJob: + def test_publishes_command_schedule(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + assert len(recorder.calls) == 1 + body = recorder.calls[0]["json"] + assert body["topic"] == "command.schedule" + assert body["payload"]["action"] == "add" + + def test_id_namespaced_with_agent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + body = recorder.calls[0]["json"] + assert body["payload"]["id"] == "gina-personal-daily" + + def test_id_idempotent_when_already_prefixed(self, adapter, recorder): + # If the caller passes an already-prefixed id, the adapter + # shouldn't double-prefix it. + adapter.add_job("hi", "0 9 * * *", job_id="gina-personal-already-set") + body = recorder.calls[0]["json"] + assert body["payload"]["id"] == "gina-personal-already-set" + + def test_topic_namespaced_with_agent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + body = recorder.calls[0]["json"] + assert body["payload"]["topic"].startswith("cron.gina-personal.") + + def test_inner_payload_carries_prompt(self, adapter, recorder): + adapter.add_job("the actual prompt", "0 9 * * *", job_id="x") + inner = recorder.calls[0]["json"]["payload"]["payload"] + assert inner["content"] == "the actual prompt" + assert inner["channel"] == "a2a" + assert inner["agent_name"] == "gina-personal" + + def test_iso_oneshot_accepted(self, adapter, recorder): + adapter.add_job("hi", "2099-01-01T00:00:00", job_id="x") + assert len(recorder.calls) == 1 + + def test_malformed_schedule_rejected(self, adapter): + with pytest.raises(ValueError): + adapter.add_job("hi", "not-a-schedule", job_id="x") + + def test_empty_prompt_rejected(self, adapter): + with pytest.raises(ValueError, match="prompt"): + adapter.add_job(" ", "0 9 * * *", job_id="x") + + def test_auth_header_sent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="x") + assert recorder.calls[0]["headers"]["X-API-Key"] == "test-key" + + +# ── cancel_job ───────────────────────────────────────────────────────────── + + +class TestCancelJob: + def test_publishes_remove(self, adapter, recorder): + adapter.cancel_job("daily") + body = recorder.calls[0]["json"] + assert body["payload"]["action"] == "remove" + assert body["payload"]["id"] == "gina-personal-daily" + + def test_returns_true_on_success(self, adapter, recorder): + assert adapter.cancel_job("daily") is True + + def test_returns_false_on_http_error(self, adapter, recorder): + recorder.response = _FakeResponse(status=500, body="boom") + assert adapter.cancel_job("daily") is False + + +# ── topic prefix override ────────────────────────────────────────────────── + + +def test_custom_topic_prefix(monkeypatch): + rec = _Recorder() + import httpx + monkeypatch.setattr(httpx, "post", rec) + adapter = WorkstaceanScheduler( + agent_name="gina-personal", + base_url="http://w:3000", + api_key="k", + topic_prefix="myorg.bus.gina", + ) + adapter.add_job("hi", "0 9 * * *", job_id="x") + body = rec.calls[0]["json"] + assert body["payload"]["topic"].startswith("myorg.bus.gina.") + + +# ── list_jobs is intentionally empty ─────────────────────────────────────── + + +def test_list_jobs_returns_empty(adapter): + """Workstacean's ``list`` action publishes async to a topic; + the adapter doesn't subscribe, so list_jobs returns [].""" + assert adapter.list_jobs() == [] + + +# ── start/stop are no-ops ────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_start_stop_no_op(adapter): + # Should not raise + await adapter.start() + await adapter.stop() diff --git a/tools/lg_tools.py b/tools/lg_tools.py index 161ddcb..dfa6727 100644 --- a/tools/lg_tools.py +++ b/tools/lg_tools.py @@ -383,18 +383,123 @@ async def daily_log(content: str) -> str: return [memory_ingest, memory_recall, memory_list, memory_stats, daily_log] +# ── scheduler tools ────────────────────────────────────────────────────────── +# +# Three tools that bind to either the local sqlite-backed scheduler or +# the Workstacean adapter — the agent loop sees one stable surface and +# never has to know which backend is wired up. +# +# Multi-agent safety: the underlying backend is constructed in +# ``server.py`` with the active ``AGENT_NAME`` baked in. add_job / +# list_jobs / cancel_job all filter by that name so two protoAgent +# instances on the same machine (or sharing one Workstacean install) +# never see each other's jobs. + + +def _build_scheduler_tools(scheduler) -> list: + """Bind scheduler tools to a ``SchedulerBackend``. Returns a list.""" + + @tool + async def schedule_task( + prompt: str, + when: str, + job_id: str | None = None, + ) -> str: + """Schedule a future task. The agent receives ``prompt`` as a + new turn when the schedule fires. + + Use this for anything the operator wants done later: reminders + ("remind me to follow up on the auth migration tomorrow at + 9am"), recurring sweeps ("every Monday morning, summarize last + week's logs"), one-off check-ins ("at 3pm today, ask whether + the deploy is healthy"). + + Args: + prompt: The text the agent should receive when the schedule + fires. Be self-contained — the agent has no memory of + this scheduling moment when the task fires. + when: Either a 5-field cron expression (``"0 9 * * 1-5"`` + = every weekday at 9am) or an ISO-8601 datetime + (``"2026-05-01T15:00:00"`` = once at 3pm UTC on May 1). + Compute exact times using ``current_time`` — the agent + cannot infer "now" from training data. + job_id: Optional human-readable id for the job. Auto- + generated if omitted; you'll need it later to cancel. + + Returns ``"Scheduled job next at ."`` on success, + an error string on malformed ``when`` or backend failure. + """ + try: + job = scheduler.add_job(prompt, when, job_id=job_id) + except ValueError as exc: + return f"Error: {exc}" + except Exception as exc: # noqa: BLE001 + return f"Error: scheduler add_job failed: {exc}" + next_fire = job.next_fire or "(managed by remote scheduler)" + return f"Scheduled job {job.id} next at {next_fire}." + + @tool + async def list_schedules() -> str: + """List the current scheduled jobs for this agent. + + Returns one job per line with id, next-fire timestamp, and a + prompt preview. Returns ``"No scheduled jobs."`` when empty. + + Backends that delegate state to a remote scheduler (e.g. the + Workstacean adapter) may return an empty list even when jobs + exist — query the remote scheduler directly to see those. + """ + jobs = scheduler.list_jobs() + if not jobs: + return "No scheduled jobs." + lines = [] + for j in jobs: + preview = (j.prompt or "")[:80] + next_fire = j.next_fire or "(managed remotely)" + lines.append(f"{j.id} next={next_fire} schedule={j.schedule!r} {preview}") + return "\n".join(lines) + + @tool + async def cancel_schedule(job_id: str) -> str: + """Cancel a scheduled job by id. + + Args: + job_id: The id returned by ``schedule_task`` (or shown by + ``list_schedules``). + + Returns ``"Canceled ."`` or ``"Error: no such job ."``. + """ + if not job_id or not job_id.strip(): + return "Error: job_id is required." + try: + ok = scheduler.cancel_job(job_id) + except Exception as exc: # noqa: BLE001 + return f"Error: scheduler cancel_job failed: {exc}" + return f"Canceled {job_id}." if ok else f"Error: no such job {job_id}." + + return [schedule_task, list_schedules, cancel_schedule] + + # ── registry ───────────────────────────────────────────────────────────────── -def get_all_tools(knowledge_store=None): +def get_all_tools(knowledge_store=None, scheduler=None): """Return every LangChain tool the lead agent + subagents can use. - When ``knowledge_store`` is provided, the memory tools are bound - to it and included. Forks that disable the store can pass - ``knowledge_store=None`` and the lead agent runs with the four - keyless tools only. + Optional dependencies: + + - ``knowledge_store`` enables the memory tools (memory_ingest, + memory_recall, memory_list, memory_stats, daily_log). + - ``scheduler`` enables the scheduler tools (schedule_task, + list_schedules, cancel_schedule). Accepts any backend that + implements ``scheduler.interface.SchedulerBackend``. + + Pass ``None`` to disable either subsystem — the lead agent runs + fine with just the four keyless general tools. """ tools = [current_time, calculator, web_search, fetch_url] if knowledge_store is not None: tools.extend(_build_memory_tools(knowledge_store)) + if scheduler is not None: + tools.extend(_build_scheduler_tools(scheduler)) return tools From 7d1da5e9aa521283f926d9e60fc786ec5f2a3c36 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 01:48:13 +0000 Subject: [PATCH 2/6] fix(review): address round-1 CodeRabbit findings on scheduler PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - docs/guides/scheduler.md: replace jargon "multiple ginas" with "multiple agents" - scheduler/__init__.py: sort __all__ lexicographically (RUF022) - scheduler/local.py: log.error → log.exception in _init_db to preserve traceback (TRY400) - scheduler/workstacean.py: correct stale module docstring that claimed list_jobs() issues a list command — it returns [] unconditionally - server.py: add -> None return annotations to _scheduler_startup/_scheduler_shutdown (ANN202) - tests: add match= to two bare pytest.raises(ValueError) calls (PT011) - tools/lg_tools.py: wrap blocking scheduler calls in asyncio.to_thread() to avoid blocking the event loop under concurrent load; fix cancel_schedule error message to not conflate transport/DB failures with "no such job" Co-Authored-By: claude-code https://claude.ai/code/session_01JmFYJSYRMRndZ43g3AYW2q --- docs/guides/scheduler.md | 2 +- scheduler/__init__.py | 2 +- scheduler/local.py | 4 ++-- scheduler/workstacean.py | 6 +++--- server.py | 4 ++-- tests/test_scheduler_local.py | 2 +- tests/test_scheduler_workstacean.py | 2 +- tools/lg_tools.py | 9 +++++---- 8 files changed, 16 insertions(+), 15 deletions(-) diff --git a/docs/guides/scheduler.md b/docs/guides/scheduler.md index dceb994..d98a793 100644 --- a/docs/guides/scheduler.md +++ b/docs/guides/scheduler.md @@ -8,7 +8,7 @@ active. ## When to read this -- You want forks (or your own multiple ginas) to support reminders, +- You want forks (or your own multiple agents) to support reminders, recurring sweeps, or any "do this later" intent. - You're running protoWorkstacean and want scheduled fires to flow through the existing bus. diff --git a/scheduler/__init__.py b/scheduler/__init__.py index 960a226..6828056 100644 --- a/scheduler/__init__.py +++ b/scheduler/__init__.py @@ -24,4 +24,4 @@ from scheduler.local import LocalScheduler from scheduler.workstacean import WorkstaceanScheduler -__all__ = ["Job", "SchedulerBackend", "LocalScheduler", "WorkstaceanScheduler"] +__all__ = ["Job", "LocalScheduler", "SchedulerBackend", "WorkstaceanScheduler"] diff --git a/scheduler/local.py b/scheduler/local.py index 17fe9b0..f853084 100644 --- a/scheduler/local.py +++ b/scheduler/local.py @@ -144,8 +144,8 @@ def _init_db(self) -> None: db.executescript(_SCHEMA) db.commit() db.close() - except sqlite3.DatabaseError as exc: - log.error("[scheduler] schema init failed at %s: %s", self.path, exc) + except sqlite3.DatabaseError: + log.exception("[scheduler] schema init failed at %s", self.path) # ── public API (matches SchedulerBackend) ─────────────────────────────── diff --git a/scheduler/workstacean.py b/scheduler/workstacean.py index 97f690b..56df684 100644 --- a/scheduler/workstacean.py +++ b/scheduler/workstacean.py @@ -13,9 +13,9 @@ - Topics are namespaced: ``cron.{agent_name}`` The adapter is fire-and-forget — Workstacean owns scheduling state. -``list_jobs()`` issues a ``list`` command and waits for the response -on the ``schedule.list`` topic. If the user wants strict local -introspection, they should run the local backend. +``list_jobs()`` returns an empty list because Workstacean's list +action publishes asynchronously — strict local introspection requires +the local backend. Note: Workstacean today does not natively dispatch to A2A endpoints; forks need to wire their Workstacean install to route ``cron.*`` diff --git a/server.py b/server.py index bf05ae0..d394744 100644 --- a/server.py +++ b/server.py @@ -849,7 +849,7 @@ def _main(): # is the only blocking call) and FastAPI fires startup/shutdown # around it. @fastapi_app.on_event("startup") - async def _scheduler_startup(): + async def _scheduler_startup() -> None: if _scheduler is None: return try: @@ -858,7 +858,7 @@ async def _scheduler_startup(): log.exception("[scheduler] startup failed") @fastapi_app.on_event("shutdown") - async def _scheduler_shutdown(): + async def _scheduler_shutdown() -> None: if _scheduler is None: return try: diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py index 524116e..867bf6c 100644 --- a/tests/test_scheduler_local.py +++ b/tests/test_scheduler_local.py @@ -73,7 +73,7 @@ def test_offset_normalized(self): assert dt.hour == 20 # 15 EST → 20 UTC def test_malformed_raises(self): - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Invalid isoformat|could not convert"): parse_iso_to_utc("not an iso string") diff --git a/tests/test_scheduler_workstacean.py b/tests/test_scheduler_workstacean.py index ddd6da4..74fb485 100644 --- a/tests/test_scheduler_workstacean.py +++ b/tests/test_scheduler_workstacean.py @@ -101,7 +101,7 @@ def test_iso_oneshot_accepted(self, adapter, recorder): assert len(recorder.calls) == 1 def test_malformed_schedule_rejected(self, adapter): - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="Invalid isoformat|could not convert"): adapter.add_job("hi", "not-a-schedule", job_id="x") def test_empty_prompt_rejected(self, adapter): diff --git a/tools/lg_tools.py b/tools/lg_tools.py index dfa6727..42c60ce 100644 --- a/tools/lg_tools.py +++ b/tools/lg_tools.py @@ -40,6 +40,7 @@ from __future__ import annotations import ast +import asyncio import operator as _op from datetime import datetime from zoneinfo import ZoneInfo, ZoneInfoNotFoundError @@ -430,7 +431,7 @@ async def schedule_task( an error string on malformed ``when`` or backend failure. """ try: - job = scheduler.add_job(prompt, when, job_id=job_id) + job = await asyncio.to_thread(scheduler.add_job, prompt, when, job_id=job_id) except ValueError as exc: return f"Error: {exc}" except Exception as exc: # noqa: BLE001 @@ -449,7 +450,7 @@ async def list_schedules() -> str: Workstacean adapter) may return an empty list even when jobs exist — query the remote scheduler directly to see those. """ - jobs = scheduler.list_jobs() + jobs = await asyncio.to_thread(scheduler.list_jobs) if not jobs: return "No scheduled jobs." lines = [] @@ -472,10 +473,10 @@ async def cancel_schedule(job_id: str) -> str: if not job_id or not job_id.strip(): return "Error: job_id is required." try: - ok = scheduler.cancel_job(job_id) + ok = await asyncio.to_thread(scheduler.cancel_job, job_id) except Exception as exc: # noqa: BLE001 return f"Error: scheduler cancel_job failed: {exc}" - return f"Canceled {job_id}." if ok else f"Error: no such job {job_id}." + return f"Canceled {job_id}." if ok else f"Error: cancel failed or no such job {job_id}." return [schedule_task, list_schedules, cancel_schedule] From 1a052785629ae7aca44475b7b8b268c3166c366d Mon Sep 17 00:00:00 2001 From: Josh Mabry Date: Mon, 27 Apr 2026 19:06:38 -0700 Subject: [PATCH 3/6] fix(review-2): address round-2 PR #156 CodeRabbit feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real bugs: - scheduler/local.py: _fire() now returns bool (True on 2xx, False on HTTP error or network exception). _tick() only reschedules / deletes when _fire() succeeds, so a transient failure leaves the job in place for the next tick to retry. Previously a one-shot job hit by a 5xx silently vanished. - server.py: the API key env var name now uses AGENT_NAME_ENV.upper() to match the auth handler at L893. The previous code read agent_name() (which returns the wizard-set identity.name when set), so a wizard rename pointed the scheduler at _API_KEY while the auth handler still expected _API_KEY → self-invocation 401'd silently after every wizard rename. - server.py: reload path now constructs a scheduler when _scheduler is None (first-run case: server boots pre-setup, wizard finishes, drawer triggers reload — this is when we *first* construct the scheduler). Existing instances are still reused — drawer saves don't tear down the polling loop. Surface: - tools/lg_tools.py: exported SCHEDULER_TOOL_NAMES and MEMORY_TOOL_NAMES as module constants. - graph/config_io.py::list_available_tools: now exposes scheduler + memory tool names to the wizard's checkbox group even when the runtime hasn't yet constructed the underlying backends. Otherwise the wizard would hide tools that the runtime registers as soon as the user finishes setup. Declined: - scheduler/local.py L141-149: CodeRabbit asked to re-raise sqlite3.DatabaseError from _init_db. The store is intentionally fail-soft (matches knowledge/store.py + audit.py): _resolve_db_path already falls back to ~/.protoagent/scheduler/ when /sandbox is unwritable, and re-raising would crash boot in exactly the scenario the fallback is designed to handle. The graceful degradation contract is "scheduler tools return errors when storage is broken, agent keeps serving everything else". Tests: - tests/test_scheduler_local.py: new test_fire_failure_leaves_job_in_place regression guard + test_fire_returns_bool contract test. - tests/test_config_io.py: list_available_tools assertions now check for memory + scheduler tools and no duplicates. 86 scheduler-scope tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- graph/config_io.py | 26 ++++++++++-- scheduler/local.py | 30 +++++++++---- server.py | 40 +++++++++++++++--- tests/test_config_io.py | 9 ++++ tests/test_scheduler_local.py | 79 +++++++++++++++++++++++++++++++++++ tools/lg_tools.py | 12 ++++++ 6 files changed, 180 insertions(+), 16 deletions(-) diff --git a/graph/config_io.py b/graph/config_io.py index 24a44ca..d2e5f94 100644 --- a/graph/config_io.py +++ b/graph/config_io.py @@ -319,10 +319,30 @@ def list_gateway_models( def list_available_tools(knowledge_store: Any = None) -> list[str]: - """Return every tool name the runtime would wire into the graph.""" - from tools.lg_tools import get_all_tools + """Return every tool name the runtime *could* wire into the graph. + + The wizard's tool checkbox group reads this. We deliberately + expose the scheduler tool names even when no scheduler has been + constructed yet (fresh boot, pre-setup) — otherwise the wizard + would hide tools that the runtime will register the moment the + user finishes setup. Same logic for memory tools when the + knowledge store is absent. + """ + from tools.lg_tools import ( + MEMORY_TOOL_NAMES, + SCHEDULER_TOOL_NAMES, + get_all_tools, + ) - return [t.name for t in get_all_tools(knowledge_store)] + names = [t.name for t in get_all_tools(knowledge_store)] + # Deduplicate while preserving order: tools already present + # (because their backend was passed in) shouldn't appear twice. + seen = set(names) + for extra in (*MEMORY_TOOL_NAMES, *SCHEDULER_TOOL_NAMES): + if extra not in seen: + names.append(extra) + seen.add(extra) + return names # --------------------------------------------------------------------------- diff --git a/scheduler/local.py b/scheduler/local.py index f853084..d495056 100644 --- a/scheduler/local.py +++ b/scheduler/local.py @@ -245,10 +245,18 @@ async def _tick(self) -> None: now = datetime.now(UTC) due = self._claim_due_jobs(now) for job in due: - try: - await self._fire(job) - finally: + # Reschedule (or delete) only when delivery actually + # succeeded. A transient HTTP failure leaves the row in + # place so the next tick retries; a one-shot stays alive + # until it lands rather than vanishing on the first + # network blip. + if await self._fire(job): self._reschedule_or_delete(job, fired_at=now) + else: + log.warning( + "[scheduler] fire failed for job %s; leaving in place for retry", + job.id, + ) def _claim_due_jobs(self, now: datetime) -> list[Job]: db = self._connect() @@ -322,8 +330,14 @@ def _recover_missed_fires(self) -> None: finally: db.close() - async def _fire(self, job: Job) -> None: - """Deliver a job by POSTing to the agent's own A2A endpoint.""" + async def _fire(self, job: Job) -> bool: + """Deliver a job by POSTing to the agent's own A2A endpoint. + + Returns ``True`` on a 2xx response, ``False`` on any HTTP + error or network exception. Callers use the return value to + decide whether to advance the schedule (success) or leave + the row in place for the next tick to retry (failure). + """ import httpx headers = {"Content-Type": "application/json"} @@ -356,10 +370,12 @@ async def _fire(self, job: Job) -> None: "[scheduler] fire failed for job %s: HTTP %d %s", job.id, r.status_code, r.text[:200], ) - else: - log.info("[scheduler] fired job %s", job.id) + return False + log.info("[scheduler] fired job %s", job.id) + return True except Exception: # noqa: BLE001 log.exception("[scheduler] fire exception for job %s", job.id) + return False def _generate_id(self) -> str: # Agent-name prefix keeps cross-agent IDs distinct in shared diff --git a/server.py b/server.py index d394744..3235120 100644 --- a/server.py +++ b/server.py @@ -185,7 +185,11 @@ def _build_scheduler(config): f"http://127.0.0.1:{_active_port}", ) bearer = (config.auth_token or os.environ.get("A2A_AUTH_TOKEN", "")).strip() - api_key_env = f"{name.upper()}_API_KEY" + # The A2A handler reads X-API-Key from ``_API_KEY`` + # (server.py L893 — note: the env-derived name, NOT the wizard-set + # ``identity.name``). Match that here so a wizard rename doesn't + # break self-invocation auth. + api_key_env = f"{AGENT_NAME_ENV.upper()}_API_KEY" api_key = os.environ.get(api_key_env, "").strip() return LocalScheduler( agent_name=name, @@ -238,11 +242,35 @@ def _reload_langgraph_agent() -> tuple[bool, str]: if is_setup_complete(): try: new_store = _build_knowledge_store(new_config) - # Re-use the running scheduler instance — tearing down the - # polling loop on every drawer save would orphan in-flight - # fires. Env-driven scheduler config (WORKSTACEAN_API_BASE, - # SCHEDULER_DISABLED) only takes effect on full restart; - # the YAML doesn't carry scheduler settings yet. + # Reuse the running scheduler so a drawer save doesn't tear + # down the polling loop and orphan in-flight fires. Build + # one only when none was constructed yet — the typical + # path is: server boots before setup is complete (no + # scheduler), wizard finishes, drawer triggers reload — + # this is when we *first* construct the scheduler. + # + # Note: the freshly-built scheduler isn't started here. + # FastAPI's startup hook fires once at process start; on + # post-setup reloads we kick the polling loop manually. + global _scheduler + if _scheduler is None: + _scheduler = _build_scheduler(new_config) + if _scheduler is not None: + # _reload_langgraph_agent is sync but called from + # inside the FastAPI event loop, so the running + # loop is available. Fire-and-forget the start — + # awaiting it would require making this whole + # function async (and every caller along with it). + try: + import asyncio + asyncio.get_running_loop().create_task(_scheduler.start()) + except RuntimeError: + log.warning( + "[reload] no running event loop; scheduler will " + "start on next process boot", + ) + except Exception: + log.exception("[reload] scheduler start failed") new_graph = create_agent_graph( new_config, knowledge_store=new_store, scheduler=_scheduler, ) diff --git a/tests/test_config_io.py b/tests/test_config_io.py index caf0bb2..7be5075 100644 --- a/tests/test_config_io.py +++ b/tests/test_config_io.py @@ -329,7 +329,16 @@ def test_list_available_tools_returns_starter_set(): assert "calculator" in names assert "web_search" in names assert "fetch_url" in names + # Memory + scheduler tools appear in the wizard checklist even + # when no store / scheduler has been constructed yet — otherwise + # the user couldn't enable them on a fresh boot. + assert "memory_ingest" in names + assert "schedule_task" in names + assert "cancel_schedule" in names assert all(isinstance(n, str) for n in names) + # No duplicates — list_available_tools dedupes between the + # backend-bound tools and the static name lists. + assert len(names) == len(set(names)) # ── Setup wizard marker ───────────────────────────────────────────────────── diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py index 867bf6c..a7fea63 100644 --- a/tests/test_scheduler_local.py +++ b/tests/test_scheduler_local.py @@ -288,3 +288,82 @@ async def post(self, url, headers=None, json=None): assert any("FIRED-ME" in str(c["json"]) for c in fired) # One-shot was deleted after firing assert s.list_jobs() == [] + + +@pytest.mark.asyncio +async def test_fire_failure_leaves_job_in_place(tmp_path, monkeypatch): + """A 5xx HTTP response from /a2a must NOT delete the job. + + Regression guard for the round-2 review finding: previously, + _tick() called _reschedule_or_delete in finally, which silently + consumed one-shot jobs on transient failures. Now the job stays + until delivery actually succeeds. + """ + s = _make_scheduler(tmp_path) + past = (datetime.now(UTC) - timedelta(seconds=1)).isoformat() + s.add_job("DURABLE", past, job_id="firetest") + + class _FakeResponse: + status_code = 503 + text = "service unavailable" + + class _FakeClient: + def __init__(self, *_a, **_kw): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, url, headers=None, json=None): + return _FakeResponse() + + import httpx + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + + await s.start() + await asyncio.sleep(1.5) # one polling tick + await s.stop() + + # Job survives the failed fire, will be retried on the next tick. + assert len(s.list_jobs()) == 1 + assert s.list_jobs()[0].id == "firetest" + + +@pytest.mark.asyncio +async def test_fire_returns_bool(tmp_path, monkeypatch): + """``_fire`` is the success/failure signal feeding the + reschedule decision in ``_tick``. Lock the contract.""" + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *", job_id="x") + + class _OkResponse: + status_code = 200 + text = "ok" + + class _ErrResponse: + status_code = 500 + text = "boom" + + class _FakeClient: + def __init__(self, response): + self._response = response + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, *_a, **_kw): + return self._response + + import httpx + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: _FakeClient(_OkResponse())) + assert await s._fire(job) is True + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: _FakeClient(_ErrResponse())) + assert await s._fire(job) is False diff --git a/tools/lg_tools.py b/tools/lg_tools.py index 42c60ce..d42effb 100644 --- a/tools/lg_tools.py +++ b/tools/lg_tools.py @@ -281,6 +281,18 @@ def _extract_text_from_html(content: bytes) -> str: _MEMORY_RECALL_MAX_K = 20 _MEMORY_LIST_MAX_LIMIT = 200 +# Stable list of scheduler tool names. Exposed as a module-level +# constant so ``graph/config_io.py::list_available_tools`` can show +# the wizard the right surface even when the runtime hasn't yet +# constructed a scheduler instance (e.g. fresh boot before setup is +# complete). Keep in sync with ``_build_scheduler_tools``. +SCHEDULER_TOOL_NAMES: tuple[str, ...] = ( + "schedule_task", "list_schedules", "cancel_schedule", +) +MEMORY_TOOL_NAMES: tuple[str, ...] = ( + "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", +) + def _build_memory_tools(knowledge_store) -> list: """Bind memory tools to a ``KnowledgeStore``. Returns a list.""" From 8b79fa8d5a2264cae69b0521a425c32190758a5f Mon Sep 17 00:00:00 2001 From: Josh Mabry Date: Mon, 27 Apr 2026 19:32:47 -0700 Subject: [PATCH 4/6] feat(scheduler): YAML opt-out via middleware.scheduler (symmetric with knowledge/memory) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Scheduler had asymmetric opt-out — only env-based (SCHEDULER_DISABLED=1). The knowledge and memory subsystems already exposed YAML toggles (middleware.knowledge, middleware.memory) so forks could flip them through the drawer or wizard. Scheduler now matches: - LangGraphConfig.scheduler_enabled: bool = True (default-on) - from_yaml() reads middleware.scheduler - config_to_dict() emits it for the drawer round-trip - config/langgraph-config.yaml has middleware.scheduler: true - server.py::_build_scheduler honors the YAML toggle first, env second Both subsystems are now genuinely opt-out: middleware: knowledge: true # was already so memory: true # was already so scheduler: true # NEW — was env-only audit: true Drawer/wizard can flip any of them without restart (the existing reload path already rebuilds on config change). The env opt-out (SCHEDULER_DISABLED=1) stays as a runtime escape hatch for fleet operators who can't edit YAML in the moment. Co-Authored-By: Claude Opus 4.7 (1M context) --- config/langgraph-config.yaml | 11 +++++++---- docs/guides/scheduler.md | 15 +++++++++++---- docs/reference/configuration.md | 2 ++ graph/config.py | 7 ++++++- graph/config_io.py | 1 + server.py | 10 ++++++++++ 6 files changed, 37 insertions(+), 9 deletions(-) diff --git a/config/langgraph-config.yaml b/config/langgraph-config.yaml index c75ff71..c7df665 100644 --- a/config/langgraph-config.yaml +++ b/config/langgraph-config.yaml @@ -38,13 +38,16 @@ subagents: max_turns: 20 middleware: - # All three middlewares default ON. The knowledge middleware needs a - # store; the template constructs one automatically (see - # ``server.py::_build_knowledge_store``). Set ``knowledge: false`` if - # your fork is purely stateless. + # All four subsystems default ON. The template constructs the + # knowledge store + scheduler backends automatically (see + # ``server.py::_build_knowledge_store`` and ``_build_scheduler``). + # Flip any of these to ``false`` to opt out — the corresponding + # tools (memory_*, schedule_*) are dropped from the agent loop + # without touching the worker subagent's tool allowlist. knowledge: true audit: true memory: true + scheduler: true knowledge: db_path: /sandbox/knowledge/agent.db diff --git a/docs/guides/scheduler.md b/docs/guides/scheduler.md index d98a793..faaf45f 100644 --- a/docs/guides/scheduler.md +++ b/docs/guides/scheduler.md @@ -34,12 +34,19 @@ not "do that thing we discussed"). `server.py::_build_scheduler` picks at startup: -1. `WORKSTACEAN_API_BASE` + `WORKSTACEAN_API_KEY` set → **`WorkstaceanScheduler`**. -2. Otherwise → **`LocalScheduler`** (sqlite, asyncio polling). -3. `SCHEDULER_DISABLED=1` → no scheduler. The three tools don't ship. +1. `middleware.scheduler: false` in YAML → no scheduler. The three + tools don't ship. (Symmetric with `middleware.knowledge` / + `middleware.memory` — drawer/wizard editable.) +2. `SCHEDULER_DISABLED=1` env → no scheduler. Runtime escape hatch + for fleet operators who can't edit config. +3. `WORKSTACEAN_API_BASE` + `WORKSTACEAN_API_KEY` set → + **`WorkstaceanScheduler`**. +4. Otherwise → **`LocalScheduler`** (sqlite, asyncio polling). Both backends honor the same `SchedulerBackend` protocol; the agent -loop never knows which one is wired up. +loop never knows which one is wired up. The scheduler is **default +on** — explicitly opt out via either config path above when a fork +wants a stateless agent with no scheduling surface. ```bash # Solo / local dev — falls through to LocalScheduler automatically. diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 3ca9a9c..6f5e1c7 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -33,6 +33,7 @@ middleware: knowledge: true audit: true memory: true + scheduler: true knowledge: db_path: /sandbox/knowledge/agent.db @@ -71,6 +72,7 @@ Adding a new subagent name to the YAML requires matching entries in `graph/subag | `knowledge` | `true` | Inject retrieved knowledge into state before LLM calls. Backed by the bundled `KnowledgeStore` (sqlite + FTS5). Set `false` for a stateless agent. | | `audit` | `true` | Append every tool call to `/sandbox/audit/audit.jsonl`. | | `memory` | `true` | Persist a session summary on terminal turn and asynchronously index conversation findings under `domain='finding'`. | +| `scheduler` | `true` | Wire the bundled scheduler backend (local sqlite, or `WorkstaceanScheduler` when env vars are set). Drops the `schedule_task` / `list_schedules` / `cancel_schedule` tools from the agent loop when `false`. Equivalent to setting `SCHEDULER_DISABLED=1`; the YAML toggle is the canonical opt-out path. | ## `knowledge` diff --git a/graph/config.py b/graph/config.py index c2cf995..aff6707 100644 --- a/graph/config.py +++ b/graph/config.py @@ -46,10 +46,14 @@ class LangGraphConfig: max_turns=20, )) - # Middleware toggles + # Middleware / subsystem toggles. All default-on so a fresh fork has + # a working memory loop + scheduler on day one. Forks that want a + # purely stateless agent (no KB, no scheduled tasks) can flip these + # via the drawer or by editing the YAML directly. knowledge_middleware: bool = True audit_middleware: bool = True memory_middleware: bool = True + scheduler_enabled: bool = True # Knowledge store — sqlite + FTS5, see ``knowledge/store.py``. # The default path lives under ``/sandbox/`` to play well with the @@ -109,6 +113,7 @@ def from_yaml(cls, path: str | Path) -> "LangGraphConfig": knowledge_middleware=middleware.get("knowledge", cls.knowledge_middleware), audit_middleware=middleware.get("audit", cls.audit_middleware), memory_middleware=middleware.get("memory", cls.memory_middleware), + scheduler_enabled=middleware.get("scheduler", cls.scheduler_enabled), knowledge_db_path=knowledge.get("db_path", cls.knowledge_db_path), embed_model=knowledge.get("embed_model", cls.embed_model), knowledge_top_k=knowledge.get("top_k", cls.knowledge_top_k), diff --git a/graph/config_io.py b/graph/config_io.py index d2e5f94..2bd4857 100644 --- a/graph/config_io.py +++ b/graph/config_io.py @@ -133,6 +133,7 @@ def config_to_dict(config: LangGraphConfig) -> dict[str, Any]: "knowledge": config.knowledge_middleware, "audit": config.audit_middleware, "memory": config.memory_middleware, + "scheduler": config.scheduler_enabled, }, "knowledge": { "db_path": config.knowledge_db_path, diff --git a/server.py b/server.py index 3235120..97804d5 100644 --- a/server.py +++ b/server.py @@ -156,6 +156,16 @@ def _build_scheduler(config): so its self-invocation HTTP call can pass through bearer / X-API-Key auth — the scheduler hits the same A2A endpoint as a real caller. """ + # Two opt-out paths, in priority order: + # 1. ``middleware.scheduler: false`` in YAML (drawer / wizard). + # This is the canonical opt-out — symmetric with + # ``middleware.knowledge`` / ``middleware.memory``. + # 2. ``SCHEDULER_DISABLED=1`` env var. Runtime escape hatch for + # fleet operators who need to kill the scheduler without + # editing config (e.g. emergency rollback). + if not getattr(config, "scheduler_enabled", True): + log.info("[server] scheduler disabled via middleware.scheduler config") + return None if os.environ.get("SCHEDULER_DISABLED", "").lower() in ("1", "true", "yes"): log.info("[server] scheduler disabled via SCHEDULER_DISABLED env") return None From 7ce3f1065c4d521bd602a539917df7e4510d673e Mon Sep 17 00:00:00 2001 From: Josh Mabry Date: Mon, 27 Apr 2026 19:43:45 -0700 Subject: [PATCH 5/6] fix(review-3+4): address rounds 3 + 4 PR #156 CodeRabbit feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real bugs: - scheduler/local.py: stop() now suppresses only asyncio.CancelledError (the expected outcome of cancelling our own task) and logs any other exception via log.exception. Previously every exception was silently swallowed, so a polling-loop crash during shutdown would vanish without a trace. - server.py: reload path now honors the new middleware.scheduler toggle. Three states: - flipped OFF (was on) → stop + drop the running scheduler; new graph builds with scheduler=None. - flipped ON (was off / first run) → construct + start. - unchanged → reuse the running instance. Helpers _start_scheduler_async / _stop_scheduler_async fire start()/stop() onto the active loop without forcing the entire reload chain to become async. Type / nits: - server.py: added `-> "SchedulerBackend | None"` return type to _build_scheduler, with a TYPE_CHECKING import to avoid runtime cycles. - tests/test_scheduler_local.py: raw-string regex for `|` alternation (test_malformed_raises); added match= to the two bare ValueError tests (test_empty_prompt_rejected, test_malformed_schedule_rejected) so they only pass for the intended error message. - tests/test_config_io.py: assert list_schedules in names alongside schedule_task / cancel_schedule. - docs/reference/configuration.md: clarified the scheduler opt-out description — middleware.scheduler is canonical, SCHEDULER_DISABLED is a runtime escape hatch. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/reference/configuration.md | 2 +- scheduler/local.py | 9 +++- server.py | 86 ++++++++++++++++++++++----------- tests/test_config_io.py | 1 + tests/test_scheduler_local.py | 6 +-- 5 files changed, 72 insertions(+), 32 deletions(-) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 6f5e1c7..872bb37 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -72,7 +72,7 @@ Adding a new subagent name to the YAML requires matching entries in `graph/subag | `knowledge` | `true` | Inject retrieved knowledge into state before LLM calls. Backed by the bundled `KnowledgeStore` (sqlite + FTS5). Set `false` for a stateless agent. | | `audit` | `true` | Append every tool call to `/sandbox/audit/audit.jsonl`. | | `memory` | `true` | Persist a session summary on terminal turn and asynchronously index conversation findings under `domain='finding'`. | -| `scheduler` | `true` | Wire the bundled scheduler backend (local sqlite, or `WorkstaceanScheduler` when env vars are set). Drops the `schedule_task` / `list_schedules` / `cancel_schedule` tools from the agent loop when `false`. Equivalent to setting `SCHEDULER_DISABLED=1`; the YAML toggle is the canonical opt-out path. | +| `scheduler` | `true` | Wire the bundled scheduler backend (local sqlite, or `WorkstaceanScheduler` when env vars are set). Drops the `schedule_task` / `list_schedules` / `cancel_schedule` tools from the agent loop when `false`. Has the same effect as `SCHEDULER_DISABLED=1` — but `middleware.scheduler: false` is the canonical opt-out (drawer/wizard editable, survives restarts), while the env var is a runtime escape hatch for fleet operators who can't edit YAML in the moment. | ## `knowledge` diff --git a/scheduler/local.py b/scheduler/local.py index d495056..635d2f4 100644 --- a/scheduler/local.py +++ b/scheduler/local.py @@ -223,8 +223,15 @@ async def stop(self) -> None: self._task.cancel() try: await self._task - except (asyncio.CancelledError, Exception): # noqa: BLE001 + except asyncio.CancelledError: + # Expected — we just cancelled it. pass + except Exception: # noqa: BLE001 + # Anything else means the polling loop crashed during + # shutdown. Log with traceback so we can debug; don't + # re-raise (caller is in shutdown path, raising would + # mask the original shutdown trigger). + log.exception("[scheduler] polling task raised during stop") self._task = None log.info("[scheduler] local backend stopped") diff --git a/server.py b/server.py index 97804d5..195f308 100644 --- a/server.py +++ b/server.py @@ -30,10 +30,13 @@ import os import time from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from graph.output_format import extract_output +if TYPE_CHECKING: + from scheduler.interface import SchedulerBackend + # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- @@ -138,7 +141,43 @@ def _build_knowledge_store(config): return None -def _build_scheduler(config): +def _start_scheduler_async(backend: "SchedulerBackend") -> None: + """Fire-and-forget scheduler.start() onto the running loop. + + Reload paths are sync but invoked from FastAPI request handlers, + so the running loop is available. Awaiting would force the entire + reload chain to become async — not worth it for one no-await + coroutine. + """ + import asyncio + try: + asyncio.get_running_loop().create_task(backend.start()) + except RuntimeError: + log.warning( + "[reload] no running event loop; scheduler will start " + "on next process boot", + ) + except Exception: + log.exception("[reload] scheduler start failed") + + +def _stop_scheduler_async(backend: "SchedulerBackend") -> None: + """Fire-and-forget scheduler.stop() onto the running loop. + + Used when the YAML toggle flips off mid-reload. The polling task + cancels cleanly; the next graph rebuild registers no scheduler + tools. + """ + import asyncio + try: + asyncio.get_running_loop().create_task(backend.stop()) + except RuntimeError: + log.warning("[reload] no running event loop; scheduler not stopped") + except Exception: + log.exception("[reload] scheduler stop failed") + + +def _build_scheduler(config) -> "SchedulerBackend | None": """Return the active scheduler backend, or ``None`` when disabled. Selection order: @@ -252,35 +291,28 @@ def _reload_langgraph_agent() -> tuple[bool, str]: if is_setup_complete(): try: new_store = _build_knowledge_store(new_config) - # Reuse the running scheduler so a drawer save doesn't tear - # down the polling loop and orphan in-flight fires. Build - # one only when none was constructed yet — the typical - # path is: server boots before setup is complete (no - # scheduler), wizard finishes, drawer triggers reload — - # this is when we *first* construct the scheduler. + # Three states for the scheduler on reload: + # + # 1. Toggle flipped OFF (was on) → stop + drop the running + # scheduler so the agent stops registering scheduler + # tools. The new graph is built with scheduler=None. + # 2. Toggle is ON and we have a running scheduler → reuse + # it. Drawer saves don't tear down the polling loop. + # 3. Toggle is ON but _scheduler is None (first-run after + # setup completes) → construct + start. # - # Note: the freshly-built scheduler isn't started here. - # FastAPI's startup hook fires once at process start; on - # post-setup reloads we kick the polling loop manually. + # Env-driven config (WORKSTACEAN_API_BASE) only takes + # effect on full process restart; the YAML toggle is the + # canonical reload-time switch. global _scheduler - if _scheduler is None: + scheduler_wanted = getattr(new_config, "scheduler_enabled", True) + if not scheduler_wanted and _scheduler is not None: + _stop_scheduler_async(_scheduler) + _scheduler = None + elif scheduler_wanted and _scheduler is None: _scheduler = _build_scheduler(new_config) if _scheduler is not None: - # _reload_langgraph_agent is sync but called from - # inside the FastAPI event loop, so the running - # loop is available. Fire-and-forget the start — - # awaiting it would require making this whole - # function async (and every caller along with it). - try: - import asyncio - asyncio.get_running_loop().create_task(_scheduler.start()) - except RuntimeError: - log.warning( - "[reload] no running event loop; scheduler will " - "start on next process boot", - ) - except Exception: - log.exception("[reload] scheduler start failed") + _start_scheduler_async(_scheduler) new_graph = create_agent_graph( new_config, knowledge_store=new_store, scheduler=_scheduler, ) diff --git a/tests/test_config_io.py b/tests/test_config_io.py index 7be5075..946abfb 100644 --- a/tests/test_config_io.py +++ b/tests/test_config_io.py @@ -334,6 +334,7 @@ def test_list_available_tools_returns_starter_set(): # the user couldn't enable them on a fresh boot. assert "memory_ingest" in names assert "schedule_task" in names + assert "list_schedules" in names assert "cancel_schedule" in names assert all(isinstance(n, str) for n in names) # No duplicates — list_available_tools dedupes between the diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py index a7fea63..06f4ef9 100644 --- a/tests/test_scheduler_local.py +++ b/tests/test_scheduler_local.py @@ -73,7 +73,7 @@ def test_offset_normalized(self): assert dt.hour == 20 # 15 EST → 20 UTC def test_malformed_raises(self): - with pytest.raises(ValueError, match="Invalid isoformat|could not convert"): + with pytest.raises(ValueError, match=r"Invalid isoformat|could not convert"): parse_iso_to_utc("not an iso string") @@ -98,12 +98,12 @@ def test_iso_one_shot(self, tmp_path): def test_empty_prompt_rejected(self, tmp_path): s = _make_scheduler(tmp_path) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match=r"prompt is required"): s.add_job(" ", "0 9 * * *") def test_malformed_schedule_rejected(self, tmp_path): s = _make_scheduler(tmp_path) - with pytest.raises(ValueError): + with pytest.raises(ValueError, match=r"Invalid isoformat|could not convert"): s.add_job("hi", "not-a-real-schedule") def test_user_id_preserved(self, tmp_path): From 309890864179494c2ad9e1d0ea6590c2b83374fd Mon Sep 17 00:00:00 2001 From: Josh Mabry Date: Mon, 27 Apr 2026 20:27:29 -0700 Subject: [PATCH 6/6] fix(review-5): address round-5 PR #156 CodeRabbit feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real bugs: - scheduler/local.py::_fire(): metadata moved from params.message.metadata to params.metadata. The A2A handler reads custom metadata from params.metadata (a2a_handler.py L1244 — `msg_metadata = params.get("metadata")`), so the previous nesting silently dropped the scheduler_job_id / scheduler_kind breadcrumb. Observers now get it as intended. - server.py reload path: scheduler swap is now planned before the graph rebuild and only committed after rebuild succeeds. A failed graph rebuild used to leave the scheduler torn down or a fresh one already started, dis-aligning runtime state. The new ordering: build candidate, rebuild graph (rollback-safe on failure), commit graph + scheduler atomically. - scheduler/local.py: _resolve_db_path now sanitizes agent_name via a new _safe_segment() helper. Strips path separators, ``..``, and absolute-path prefixes; falls back to "default" when nothing usable remains. Defence in depth — the value comes from operator- controlled env / YAML, but a typo or copy-paste shouldn't be able to put a sqlite file outside the configured scheduler dir. Tests: - tests/test_scheduler_local.py::test_cron_rescheduled_after_fire: pinned to a fixed fired_at timestamp so the assertion is exact (next_fire == "2026-04-29T09:00:00+00:00") instead of a "different from original" near-tautology that depends on datetime.now(). Docs: - docs/reference/configuration.md: clarified that the scheduler's enable/disable lives in YAML (middleware.scheduler), while backend selection and runtime knobs are env-driven. Repositioned SCHEDULER_DISABLED as the runtime escape hatch. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/reference/configuration.md | 4 +-- scheduler/local.py | 42 ++++++++++++++++++---- server.py | 63 +++++++++++++++++++++------------ tests/test_scheduler_local.py | 15 ++++---- 4 files changed, 85 insertions(+), 39 deletions(-) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 872bb37..2913700 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -88,7 +88,7 @@ The bundled store is sqlite + FTS5 (with an automatic LIKE fallback when FTS5 is ## Scheduler -The bundled scheduler is configured entirely via environment, not YAML, so the same image can be deployed under either backend without rebuilding. See [Schedule future work](/guides/scheduler) for the full guide. +Scheduler **enable/disable** is YAML-controlled (`middleware.scheduler` above) so the drawer can flip it without a restart. Backend **selection and runtime knobs** (which backend, where to write the sqlite, where to publish, etc.) are env-driven so the same container image can run under either backend without a rebuild. See [Schedule future work](/guides/scheduler) for the full guide. | Env var | Default | What | |---|---|---| @@ -97,4 +97,4 @@ The bundled scheduler is configured entirely via environment, not YAML, so the s | `WORKSTACEAN_TOPIC_PREFIX` | `cron.` | Override the bus topic the adapter fires on, when your Workstacean install uses a different convention. | | `SCHEDULER_DB_DIR` | `/sandbox/scheduler` | Local backend: parent directory for `/jobs.db`. Falls back to `~/.protoagent/scheduler//jobs.db` when unwritable. | | `SCHEDULER_INVOKE_URL` | `http://127.0.0.1:` | Local backend: where to POST `message/send` when a job fires. Override only if the agent's A2A endpoint isn't on localhost. | -| `SCHEDULER_DISABLED` | unset | Set to `1` / `true` to drop the scheduler tools entirely. | +| `SCHEDULER_DISABLED` | unset | Runtime escape hatch — set to `1` / `true` to drop the scheduler tools entirely without editing YAML. `middleware.scheduler: false` is the canonical opt-out. | diff --git a/scheduler/local.py b/scheduler/local.py index 635d2f4..37187cc 100644 --- a/scheduler/local.py +++ b/scheduler/local.py @@ -48,9 +48,17 @@ def _resolve_db_path(db_dir: str | Path | None, agent_name: str) -> Path: - """Pick a writable jobs.db path namespaced by agent name.""" + """Pick a writable jobs.db path namespaced by agent name. + + ``agent_name`` is sanitized to a single path segment before being + appended — operators set it via env or YAML, but defence in depth + against a value like ``../etc/passwd`` or ``/tmp/elsewhere`` is + cheap and prevents an exotic typo from putting a sqlite file + outside the configured scheduler dir. + """ + safe_name = _safe_segment(agent_name) raw = os.environ.get("SCHEDULER_DB_DIR") or db_dir or DEFAULT_DB_DIR - base = Path(str(raw)).expanduser() / agent_name + base = Path(str(raw)).expanduser() / safe_name try: base.mkdir(parents=True, exist_ok=True) probe = base / ".write-probe" @@ -58,12 +66,27 @@ def _resolve_db_path(db_dir: str | Path | None, agent_name: str) -> Path: probe.unlink() return base / "jobs.db" except OSError: - fallback = Path.home() / ".protoagent" / "scheduler" / agent_name + fallback = Path.home() / ".protoagent" / "scheduler" / safe_name fallback.mkdir(parents=True, exist_ok=True) log.info("[scheduler] %s not writable; using %s instead", base, fallback) return fallback / "jobs.db" +def _safe_segment(name: str) -> str: + """Reduce ``name`` to a single safe path segment. + + Replaces path separators, ``..``, and absolute-path prefixes with + underscores; falls back to ``"default"`` when nothing usable + remains. Preserves the common slug shape (``gina-personal``, + ``ginavision``) without surprises. + """ + if not name: + return "default" + cleaned = name.replace("/", "_").replace("\\", "_").replace("..", "_") + cleaned = cleaned.lstrip(".").strip() + return cleaned or "default" + + def _now_iso() -> str: return datetime.now(UTC).isoformat() @@ -363,10 +386,15 @@ async def _fire(self, job: Job) -> bool: "role": "user", "parts": [{"kind": "text", "text": job.prompt}], "messageId": message_id, - # Carry the originating job id so observers can tell - # this turn was scheduler-driven, not user-driven. - "metadata": {"scheduler_job_id": job.id, "scheduler_kind": "local"}, - } + }, + # Custom metadata goes at params.metadata — that's + # where a2a_handler._a2a_rpc reads it (see + # ``msg_metadata = params.get("metadata")``). Putting + # it inside params.message.metadata silently drops it. + "metadata": { + "scheduler_job_id": job.id, + "scheduler_kind": "local", + }, }, } try: diff --git a/server.py b/server.py index 195f308..8b10e4f 100644 --- a/server.py +++ b/server.py @@ -288,36 +288,44 @@ def _reload_langgraph_agent() -> tuple[bool, str]: # would leave the process serving the prior compiled _graph under # fresh _graph_config + rotated bearer auth on failure — the # metrics / card / auth all de-sync from what's actually running. + # Plan the scheduler swap *before* attempting the graph rebuild so + # the polling loop isn't torn down (or a fresh one started) until + # we know the rebuild will succeed. Three states: + # + # 1. Toggle flipped OFF, scheduler currently running → next graph + # uses None; we stop the running scheduler only after commit. + # 2. Toggle ON, none running (first-run after setup completes) → + # construct now (cheap), start only after commit. + # 3. Toggle ON, already running → reuse. Drawer saves don't tear + # down the polling loop. + # + # Env-driven config (WORKSTACEAN_API_BASE) only takes effect on + # full process restart; the YAML toggle is the canonical + # reload-time switch. + global _scheduler + scheduler_wanted = getattr(new_config, "scheduler_enabled", True) + next_scheduler: "SchedulerBackend | None" + pending_start: "SchedulerBackend | None" = None + pending_stop: "SchedulerBackend | None" = None + if not scheduler_wanted: + next_scheduler = None + pending_stop = _scheduler # may be None — stopper is no-op then + elif _scheduler is None: + next_scheduler = _build_scheduler(new_config) + pending_start = next_scheduler + else: + next_scheduler = _scheduler + if is_setup_complete(): try: new_store = _build_knowledge_store(new_config) - # Three states for the scheduler on reload: - # - # 1. Toggle flipped OFF (was on) → stop + drop the running - # scheduler so the agent stops registering scheduler - # tools. The new graph is built with scheduler=None. - # 2. Toggle is ON and we have a running scheduler → reuse - # it. Drawer saves don't tear down the polling loop. - # 3. Toggle is ON but _scheduler is None (first-run after - # setup completes) → construct + start. - # - # Env-driven config (WORKSTACEAN_API_BASE) only takes - # effect on full process restart; the YAML toggle is the - # canonical reload-time switch. - global _scheduler - scheduler_wanted = getattr(new_config, "scheduler_enabled", True) - if not scheduler_wanted and _scheduler is not None: - _stop_scheduler_async(_scheduler) - _scheduler = None - elif scheduler_wanted and _scheduler is None: - _scheduler = _build_scheduler(new_config) - if _scheduler is not None: - _start_scheduler_async(_scheduler) new_graph = create_agent_graph( - new_config, knowledge_store=new_store, scheduler=_scheduler, + new_config, knowledge_store=new_store, scheduler=next_scheduler, ) except Exception as e: log.exception("[reload] graph rebuild failed") + # Scheduler state hasn't been committed yet — caller's + # running scheduler keeps polling, no orphaned tasks. return False, f"graph rebuild failed: {e}" else: new_graph = None @@ -334,6 +342,15 @@ def _reload_langgraph_agent() -> tuple[bool, str]: # before _main wires routes) — harmless. pass _graph = new_graph + # Commit the scheduler swap. start/stop are async — fire-and-forget + # onto the active loop so reload stays sync. We've already verified + # the graph rebuild succeeded; if start/stop fails we log but + # don't roll back (the agent is already serving the new graph). + _scheduler = next_scheduler + if pending_stop is not None: + _stop_scheduler_async(pending_stop) + if pending_start is not None: + _start_scheduler_async(pending_start) if new_graph is None: log.info("[reload] setup not complete — config reloaded, graph not compiled") diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py index 06f4ef9..0e65ca1 100644 --- a/tests/test_scheduler_local.py +++ b/tests/test_scheduler_local.py @@ -170,13 +170,14 @@ def test_cron_rescheduled_after_fire(self, tmp_path): s = _make_scheduler(tmp_path) s.add_job("hi", "0 9 * * *", job_id="cron") job = s.list_jobs()[0] - original_next = job.next_fire - # Fire at "now" — next_fire should advance to the next 09:00 UTC - s._reschedule_or_delete(job, fired_at=datetime.now(UTC)) - new_next = s.list_jobs()[0].next_fire - assert new_next != original_next or original_next > datetime.now(UTC).isoformat() - # last_fire should be populated - assert s.list_jobs()[0].last_fire is not None + # Fire at a fixed timestamp — 2026-04-28T10:00:00Z is one hour + # past the 09:00 cron slot, so the next fire must be exactly + # 2026-04-29T09:00:00Z. + fired_at = datetime(2026, 4, 28, 10, 0, 0, tzinfo=UTC) + s._reschedule_or_delete(job, fired_at=fired_at) + rescheduled = s.list_jobs()[0] + assert rescheduled.next_fire == "2026-04-29T09:00:00+00:00" + assert rescheduled.last_fire == fired_at.isoformat() class TestMissedFireRecovery: