diff --git a/README.md b/README.md index ef54b84..4a7036f 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ rename / release-pipeline wiring. | Subagents | `graph/subagents/config.py` | DeerFlow-pattern delegation via a `task()` tool; one placeholder `worker` ships | | Starter tools | `tools/lg_tools.py` | Keyless general tools (`current_time`, `calculator` safe AST eval, `web_search` via DuckDuckGo, `fetch_url`) plus memory tools (`memory_ingest`, `memory_recall`, `memory_list`, `memory_stats`, `daily_log`) bound to the bundled store | | Knowledge store | `knowledge/store.py` | sqlite + FTS5 (LIKE fallback). One `chunks` table for operator notes, daily-log entries, and conversation findings. Default-on; turn off with `middleware.knowledge: false` | +| Scheduler | `scheduler/` | `schedule_task` / `list_schedules` / `cancel_schedule` tools backed by either a bundled sqlite scheduler or a Workstacean adapter (env-selected). Multi-agent-safe — every job is namespaced by `AGENT_NAME`. See [Schedule future work](./docs/guides/scheduler.md) | | Eval harness | `evals/` | Side-effect-verified A2A test harness — audit log + reply text + KB state. `python -m evals.runner` against a running agent. See [Eval your fork](./docs/guides/evals.md) | | Tracing | `tracing.py` | Langfuse trace_session with distributed `a2a.trace` propagation and the OTel cross-context-detach filter | | Observability | `metrics.py`, `audit.py` | Prometheus metrics with per-agent prefix, JSONL audit log with trace IDs | diff --git a/TEMPLATE.md b/TEMPLATE.md index 08ff5b3..4408c90 100644 --- a/TEMPLATE.md +++ b/TEMPLATE.md @@ -180,6 +180,28 @@ See [Eval your fork](./docs/guides/evals.md) for what each case asserts, how the three assertion channels work, and how to add cases for your fork's new tools. +## 9b. Scheduler — local sqlite or Workstacean + +The bundled scheduler ships three agent tools — `schedule_task`, +`list_schedules`, `cancel_schedule` — backed by either a local +sqlite poller or a Workstacean adapter, selected at startup via env: + +```bash +# Default: local sqlite, persists at /sandbox/scheduler//jobs.db +python server.py + +# Workstacean: set both and restart +export WORKSTACEAN_API_BASE=http://your-workstacean:3000 +export WORKSTACEAN_API_KEY=... +python server.py +``` + +Multi-fork safety: every job is namespaced by `AGENT_NAME`, so +spinning up `gina-personal` next to `gina-work` (or any number of +ginas under one Workstacean) doesn't cross-fire prompts. See +[Schedule future work](./docs/guides/scheduler.md) for the full +firing model and integration notes. + ## 9a. Understand the skill loop protoAgent's skill loop lets your agent learn from experience automatically. diff --git a/config/langgraph-config.yaml b/config/langgraph-config.yaml index 05bada2..c7df665 100644 --- a/config/langgraph-config.yaml +++ b/config/langgraph-config.yaml @@ -32,16 +32,22 @@ subagents: - memory_list - memory_stats - daily_log + - schedule_task + - list_schedules + - cancel_schedule max_turns: 20 middleware: - # All three middlewares default ON. The knowledge middleware needs a - # store; the template constructs one automatically (see - # ``server.py::_build_knowledge_store``). Set ``knowledge: false`` if - # your fork is purely stateless. + # All four subsystems default ON. The template constructs the + # knowledge store + scheduler backends automatically (see + # ``server.py::_build_knowledge_store`` and ``_build_scheduler``). + # Flip any of these to ``false`` to opt out — the corresponding + # tools (memory_*, schedule_*) are dropped from the agent loop + # without touching the worker subagent's tool allowlist. knowledge: true audit: true memory: true + scheduler: true knowledge: db_path: /sandbox/knowledge/agent.db diff --git a/docs/guides/index.md b/docs/guides/index.md index 65dc41e..ce26b48 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -10,4 +10,5 @@ Task-oriented procedures. Assumes you already have a running agent (see [Tutoria | [Configure subagents](/guides/subagents) | You want specialized delegates beyond the placeholder `worker` | | [Wire Langfuse + Prometheus](/guides/observability) | You need traces and metrics in production | | [Eval your fork](/guides/evals) | You want a baseline pass-rate for the tools / memory / A2A surface in your fork | +| [Schedule future work](/guides/scheduler) | You want the agent to defer tasks to itself ("remind me tomorrow", recurring sweeps) — local sqlite or Workstacean-backed | | [Deploy via GHCR](/guides/deploy) | You're ready to ship and want auto-deploy wired up | diff --git a/docs/guides/scheduler.md b/docs/guides/scheduler.md new file mode 100644 index 0000000..faaf45f --- /dev/null +++ b/docs/guides/scheduler.md @@ -0,0 +1,178 @@ +# Schedule future work + +protoAgent ships a scheduler so the agent can defer tasks to itself — +"remind me about X tomorrow", "every Monday morning summarize last +week's logs", "at 3pm check the deploy". Two backends ship by default; +the agent-facing tool surface is identical regardless of which one is +active. + +## When to read this + +- You want forks (or your own multiple agents) to support reminders, + recurring sweeps, or any "do this later" intent. +- You're running protoWorkstacean and want scheduled fires to flow + through the existing bus. +- You're spinning up multiple protoAgent instances on one box and + need scheduling state to stay isolated per agent. + +## The three tools + +When the scheduler is active, three tools land in `get_all_tools()`: + +| Tool | What it does | +|---|---| +| `schedule_task(prompt, when, job_id?)` | Persist a future invocation. `when` is cron (`"0 9 * * *"`) or ISO-8601 (`"2026-05-01T15:00:00"`). | +| `list_schedules()` | Show all jobs visible to *this* agent. | +| `cancel_schedule(job_id)` | Remove a job by id. | + +Prompts are self-contained — the agent has no memory of the +scheduling moment when the task fires, so write the prompt as a fresh +turn ("review last week's pipeline incidents and post a summary", +not "do that thing we discussed"). + +## Backend selection + +`server.py::_build_scheduler` picks at startup: + +1. `middleware.scheduler: false` in YAML → no scheduler. The three + tools don't ship. (Symmetric with `middleware.knowledge` / + `middleware.memory` — drawer/wizard editable.) +2. `SCHEDULER_DISABLED=1` env → no scheduler. Runtime escape hatch + for fleet operators who can't edit config. +3. `WORKSTACEAN_API_BASE` + `WORKSTACEAN_API_KEY` set → + **`WorkstaceanScheduler`**. +4. Otherwise → **`LocalScheduler`** (sqlite, asyncio polling). + +Both backends honor the same `SchedulerBackend` protocol; the agent +loop never knows which one is wired up. The scheduler is **default +on** — explicitly opt out via either config path above when a fork +wants a stateless agent with no scheduling surface. + +```bash +# Solo / local dev — falls through to LocalScheduler automatically. +python server.py + +# Workstacean install — set both env vars and restart. +export WORKSTACEAN_API_BASE=http://your-workstacean-host:3000 +export WORKSTACEAN_API_KEY= +python server.py +``` + +> **protoLabs operators**: the fleet's Workstacean lives on the +> `ava` node; `WORKSTACEAN_API_KEY` is in the org's secrets manager +> under `secret-management → workstacean`. Coordinate with the team +> for the exact URL. + +## Multi-agent isolation + +Every job is namespaced by `AGENT_NAME` so spinning up +`gina-personal` alongside `gina-work` on the same box doesn't +cross-fire prompts. + +| Backend | How it isolates | +|---|---| +| Local | DB path per agent: `/sandbox/scheduler//jobs.db` (falls back to `~/.protoagent/scheduler//jobs.db`). Every row also carries `agent_name`; reads filter on it. | +| Workstacean | Job IDs are prefixed `-...`; topics are namespaced `cron..`. One Workstacean install can serve N forks safely. | + +If you supply your own `job_id` in `schedule_task`: + +- Local: the id is stored as-is. Two agents sharing one DB path with + the same user-supplied id will trip a primary-key collision (the + second add raises a clear error). To avoid it, let the scheduler + auto-generate (the auto-id is `-`). +- Workstacean: the adapter prepends `-` if your id doesn't + already start with it, so cross-agent collisions are impossible. + +## Local backend — how firing works + +The local scheduler runs an asyncio polling task on FastAPI's +`startup` event. Once a second: + +1. Read jobs where `next_fire <= now()` and `enabled = 1`. +2. For each due job: POST to `http://127.0.0.1:/a2a` as + a `message/send` with the job's prompt as the message text. Bearer + + X-API-Key are forwarded automatically. +3. One-shot ISO jobs are deleted after firing. Cron jobs reschedule + forward via `croniter`. + +Going through HTTP rather than calling into the graph directly buys +parity with real callers — the audit log, cost-v1 capture, and +push-notification path all behave identically. + +### Missed-fire recovery + +On startup, jobs whose `next_fire` is in the past are inspected: + +- **Within the last 24h** — fire on the next tick (so a 5-minute + outage doesn't lose an upcoming reminder). +- **Older than 24h** — cron jobs roll forward to the next slot + without firing; one-shot jobs are dropped. This matches + Workstacean's recovery behaviour and avoids flooding the agent + with stale prompts after a long downtime. + +### Persistence path + +```bash +# Default (Docker) +/sandbox/scheduler//jobs.db + +# Local fallback (when /sandbox isn't writable) +~/.protoagent/scheduler//jobs.db + +# Override +export SCHEDULER_DB_DIR=/var/data/agents +# → /var/data/agents//jobs.db +``` + +Mount a volume at the configured path to survive container +restarts (analogous to `audit/` and `knowledge/`). + +## Workstacean backend — how firing works + +When `WORKSTACEAN_API_BASE` and `WORKSTACEAN_API_KEY` are set, the +adapter publishes to `POST {base}/publish` with topic +`command.schedule` and the action wrapper Workstacean expects. See +the [Workstacean scheduler reference](https://protolabsai.github.io/protoWorkstacean/reference/scheduler/) +for the payload shape. + +When the schedule fires, Workstacean publishes the inner payload to +`cron..`. **Workstacean does not natively dispatch +to A2A endpoints today** — your fork needs to wire a bridge that +subscribes to `cron..*` and POSTs to the protoAgent's +`/a2a` endpoint. + +### Topic prefix override + +If your existing Workstacean bus uses a different convention: + +```bash +export WORKSTACEAN_TOPIC_PREFIX="myorg.cron.gina" +# → topics fire on myorg.cron.gina. +``` + +### `list_schedules()` returns empty under Workstacean + +Workstacean's `list` action publishes its response on the +`schedule.list` topic — there's no synchronous reply on `/publish`. +The adapter intentionally doesn't subscribe. If you need live +introspection, query Workstacean directly or run the local backend. + +## Adding a case to your eval suite + +The default `evals/tasks.json` doesn't include scheduler cases (the +fire path is async — a single eval run can't easily test that the +scheduled prompt arrives). For forks that want it, the pattern is: + +1. `schedule_task(prompt, "")` in setup. +2. Wait > 1 second. +3. Assert on the audit log and/or KB state for the *fired* prompt's + side effects. + +Document the case as `category: "scheduler"` and gate at >= 2/3 +attempts to absorb timing jitter. + +## References + +- [Workstacean scheduler reference](https://protolabsai.github.io/protoWorkstacean/reference/scheduler/) +- [Configuration](/reference/configuration#scheduler) — env vars +- [Eval your fork](/guides/evals) — for the testing pattern above diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index bd3f5be..2913700 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -33,6 +33,7 @@ middleware: knowledge: true audit: true memory: true + scheduler: true knowledge: db_path: /sandbox/knowledge/agent.db @@ -71,6 +72,7 @@ Adding a new subagent name to the YAML requires matching entries in `graph/subag | `knowledge` | `true` | Inject retrieved knowledge into state before LLM calls. Backed by the bundled `KnowledgeStore` (sqlite + FTS5). Set `false` for a stateless agent. | | `audit` | `true` | Append every tool call to `/sandbox/audit/audit.jsonl`. | | `memory` | `true` | Persist a session summary on terminal turn and asynchronously index conversation findings under `domain='finding'`. | +| `scheduler` | `true` | Wire the bundled scheduler backend (local sqlite, or `WorkstaceanScheduler` when env vars are set). Drops the `schedule_task` / `list_schedules` / `cancel_schedule` tools from the agent loop when `false`. Has the same effect as `SCHEDULER_DISABLED=1` — but `middleware.scheduler: false` is the canonical opt-out (drawer/wizard editable, survives restarts), while the env var is a runtime escape hatch for fleet operators who can't edit YAML in the moment. | ## `knowledge` @@ -83,3 +85,16 @@ Only read when `middleware.knowledge` is `true`. | `top_k` | `5` | Results per query fed into state. | The bundled store is sqlite + FTS5 (with an automatic LIKE fallback when FTS5 isn't available). One `chunks` table; the `domain` column distinguishes operator-set notes (`memory_ingest`), daily-log entries (`daily_log`), and conversation findings extracted by `MemoryMiddleware` (`domain='finding'`). + +## Scheduler + +Scheduler **enable/disable** is YAML-controlled (`middleware.scheduler` above) so the drawer can flip it without a restart. Backend **selection and runtime knobs** (which backend, where to write the sqlite, where to publish, etc.) are env-driven so the same container image can run under either backend without a rebuild. See [Schedule future work](/guides/scheduler) for the full guide. + +| Env var | Default | What | +|---|---|---| +| `WORKSTACEAN_API_BASE` | unset | When set together with `WORKSTACEAN_API_KEY`, swaps the bundled local scheduler for the `WorkstaceanScheduler` HTTP adapter. | +| `WORKSTACEAN_API_KEY` | unset | Auth token sent as `X-API-Key` to Workstacean's `/publish`. | +| `WORKSTACEAN_TOPIC_PREFIX` | `cron.` | Override the bus topic the adapter fires on, when your Workstacean install uses a different convention. | +| `SCHEDULER_DB_DIR` | `/sandbox/scheduler` | Local backend: parent directory for `/jobs.db`. Falls back to `~/.protoagent/scheduler//jobs.db` when unwritable. | +| `SCHEDULER_INVOKE_URL` | `http://127.0.0.1:` | Local backend: where to POST `message/send` when a job fires. Override only if the agent's A2A endpoint isn't on localhost. | +| `SCHEDULER_DISABLED` | unset | Runtime escape hatch — set to `1` / `true` to drop the scheduler tools entirely without editing YAML. `middleware.scheduler: false` is the canonical opt-out. | diff --git a/graph/agent.py b/graph/agent.py index 355c3fc..08ad32a 100644 --- a/graph/agent.py +++ b/graph/agent.py @@ -158,6 +158,7 @@ async def task( def create_agent_graph( config: LangGraphConfig, knowledge_store=None, + scheduler=None, include_subagents: bool = True, ): """Create the protoAgent LangGraph agent. @@ -167,7 +168,7 @@ def create_agent_graph( """ llm = create_llm(config) - all_tools = get_all_tools(knowledge_store) + all_tools = get_all_tools(knowledge_store, scheduler=scheduler) if include_subagents: task_tool = _build_task_tool(config, all_tools) @@ -189,12 +190,12 @@ def create_agent_graph( return agent -def create_simple_agent(config: LangGraphConfig, knowledge_store=None): +def create_simple_agent(config: LangGraphConfig, knowledge_store=None, scheduler=None): """Create a simple agent without subagents (for debugging/testing).""" from langgraph.prebuilt import create_react_agent llm = create_llm(config) - all_tools = get_all_tools(knowledge_store) + all_tools = get_all_tools(knowledge_store, scheduler=scheduler) system_prompt = build_system_prompt(include_subagents=False) diff --git a/graph/config.py b/graph/config.py index a3df02b..aff6707 100644 --- a/graph/config.py +++ b/graph/config.py @@ -41,14 +41,19 @@ class LangGraphConfig: "current_time", "calculator", "web_search", "fetch_url", "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", + "schedule_task", "list_schedules", "cancel_schedule", ], max_turns=20, )) - # Middleware toggles + # Middleware / subsystem toggles. All default-on so a fresh fork has + # a working memory loop + scheduler on day one. Forks that want a + # purely stateless agent (no KB, no scheduled tasks) can flip these + # via the drawer or by editing the YAML directly. knowledge_middleware: bool = True audit_middleware: bool = True memory_middleware: bool = True + scheduler_enabled: bool = True # Knowledge store — sqlite + FTS5, see ``knowledge/store.py``. # The default path lives under ``/sandbox/`` to play well with the @@ -108,6 +113,7 @@ def from_yaml(cls, path: str | Path) -> "LangGraphConfig": knowledge_middleware=middleware.get("knowledge", cls.knowledge_middleware), audit_middleware=middleware.get("audit", cls.audit_middleware), memory_middleware=middleware.get("memory", cls.memory_middleware), + scheduler_enabled=middleware.get("scheduler", cls.scheduler_enabled), knowledge_db_path=knowledge.get("db_path", cls.knowledge_db_path), embed_model=knowledge.get("embed_model", cls.embed_model), knowledge_top_k=knowledge.get("top_k", cls.knowledge_top_k), diff --git a/graph/config_io.py b/graph/config_io.py index 24a44ca..2bd4857 100644 --- a/graph/config_io.py +++ b/graph/config_io.py @@ -133,6 +133,7 @@ def config_to_dict(config: LangGraphConfig) -> dict[str, Any]: "knowledge": config.knowledge_middleware, "audit": config.audit_middleware, "memory": config.memory_middleware, + "scheduler": config.scheduler_enabled, }, "knowledge": { "db_path": config.knowledge_db_path, @@ -319,10 +320,30 @@ def list_gateway_models( def list_available_tools(knowledge_store: Any = None) -> list[str]: - """Return every tool name the runtime would wire into the graph.""" - from tools.lg_tools import get_all_tools + """Return every tool name the runtime *could* wire into the graph. + + The wizard's tool checkbox group reads this. We deliberately + expose the scheduler tool names even when no scheduler has been + constructed yet (fresh boot, pre-setup) — otherwise the wizard + would hide tools that the runtime will register the moment the + user finishes setup. Same logic for memory tools when the + knowledge store is absent. + """ + from tools.lg_tools import ( + MEMORY_TOOL_NAMES, + SCHEDULER_TOOL_NAMES, + get_all_tools, + ) - return [t.name for t in get_all_tools(knowledge_store)] + names = [t.name for t in get_all_tools(knowledge_store)] + # Deduplicate while preserving order: tools already present + # (because their backend was passed in) shouldn't appear twice. + seen = set(names) + for extra in (*MEMORY_TOOL_NAMES, *SCHEDULER_TOOL_NAMES): + if extra not in seen: + names.append(extra) + seen.add(extra) + return names # --------------------------------------------------------------------------- diff --git a/graph/subagents/config.py b/graph/subagents/config.py index 560edc7..a488703 100644 --- a/graph/subagents/config.py +++ b/graph/subagents/config.py @@ -67,6 +67,7 @@ class SubagentConfig: "current_time", "calculator", "web_search", "fetch_url", "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", + "schedule_task", "list_schedules", "cancel_schedule", ], max_turns=20, ) diff --git a/requirements.txt b/requirements.txt index 30ef46d..aa05284 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,7 @@ langchain-openai>=0.3.0 # Starter tools (tools/lg_tools.py) ddgs>=9.0 beautifulsoup4>=4.12 + +# Scheduler (scheduler/local.py — cron expression parsing for the +# bundled local backend; the Workstacean adapter doesn't need this) +croniter>=2.0 diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 0000000..6828056 --- /dev/null +++ b/scheduler/__init__.py @@ -0,0 +1,27 @@ +"""Pluggable scheduler for future-task delivery. + +Two backends ship by default: + +- ``LocalScheduler`` — sqlite + asyncio. Bundled, zero external + dependencies, per-agent persistence path. Use this for solo forks + or any deployment that doesn't already run protoWorkstacean. +- ``WorkstaceanScheduler`` — HTTP adapter to a protoWorkstacean + install. Topic-namespaced per agent so multiple ginas can share one + Workstacean and not collide. + +``server.py`` selects the backend at startup based on env vars; the +agent loop sees the same three tools (``schedule_task``, +``list_schedules``, ``cancel_schedule``) regardless of which backend +is wired up. + +Multi-agent safety: every job carries an ``agent_name`` (defaulted +from ``AGENT_NAME`` env / config) so that two protoAgent instances +sharing one storage path or one Workstacean install can't accidentally +fire each other's scheduled prompts. +""" + +from scheduler.interface import Job, SchedulerBackend +from scheduler.local import LocalScheduler +from scheduler.workstacean import WorkstaceanScheduler + +__all__ = ["Job", "LocalScheduler", "SchedulerBackend", "WorkstaceanScheduler"] diff --git a/scheduler/interface.py b/scheduler/interface.py new file mode 100644 index 0000000..6de9b3a --- /dev/null +++ b/scheduler/interface.py @@ -0,0 +1,114 @@ +"""Scheduler protocol — the contract every backend honors. + +Both ``LocalScheduler`` and ``WorkstaceanScheduler`` implement this +shape. The agent-facing tools in ``tools/lg_tools.py`` only see the +protocol; swapping backends is a server.py-level decision. +""" + +from __future__ import annotations + +import re +from dataclasses import asdict, dataclass, field +from datetime import UTC, datetime +from typing import Any, Protocol + + +@dataclass +class Job: + """A scheduled future invocation. + + ``schedule`` is either a 5-field cron expression (e.g. + ``"0 9 * * 1-5"``) or an ISO-8601 datetime for one-shot fires + (e.g. ``"2026-05-01T15:00:00+00:00"``). Backends auto-detect. + + ``agent_name`` namespaces the job — one Workstacean install or + shared sqlite path can serve N protoAgent instances without + cross-firing. + """ + + id: str + prompt: str + schedule: str + agent_name: str + created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + next_fire: str | None = None # ISO; None means "compute on save" + last_fire: str | None = None + enabled: bool = True + + def as_dict(self) -> dict[str, Any]: + return asdict(self) + + +class SchedulerBackend(Protocol): + """The minimum surface every backend implements. + + Methods are sync because the agent tools wrap them in their own + async functions; backends that need to do async I/O (httpx in + Workstacean's case) handle it internally. + """ + + name: str # short label for logs / agent-facing strings: "local", "workstacean" + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + """Persist a new job. Returns the stored ``Job`` (with + backend-assigned id and next_fire if the caller didn't set them). + + Raises ``ValueError`` for malformed schedule strings.""" + ... + + def cancel_job(self, job_id: str) -> bool: + """Remove a job. Returns ``True`` if a row was deleted.""" + ... + + def list_jobs(self) -> list[Job]: + """All jobs visible to the calling agent. Implementations are + responsible for filtering by ``agent_name`` so multi-agent + deployments stay isolated.""" + ... + + async def start(self) -> None: + """Start any background polling. No-op for backends that don't + need it (Workstacean dispatches and forgets).""" + ... + + async def stop(self) -> None: + """Cleanly shut down background work.""" + ... + + +# ── shared helpers ────────────────────────────────────────────────────────── + + +_CRON_PATTERN = re.compile(r"^\s*\S+\s+\S+\s+\S+\s+\S+\s+\S+\s*$") + + +def is_cron(schedule: str) -> bool: + """Heuristic: does ``schedule`` look like a 5-field cron expression? + + Used by both backends to decide between cron-iter and + ``datetime.fromisoformat``. Doesn't validate semantics — that + happens when the schedule is parsed. + """ + return bool(_CRON_PATTERN.match(schedule)) and not _looks_like_iso(schedule) + + +def _looks_like_iso(schedule: str) -> bool: + # ISO datetimes contain ``-`` and either ``T`` or a space between + # date and time. Cron has neither in the first field. + return "T" in schedule or _has_iso_date_prefix(schedule) + + +def _has_iso_date_prefix(schedule: str) -> bool: + head = schedule.strip().split(" ", 1)[0] + return bool(re.match(r"^\d{4}-\d{2}-\d{2}", head)) + + +def parse_iso_to_utc(schedule: str) -> datetime: + """Parse an ISO-8601 datetime, treating naive inputs as UTC. + + Raises ``ValueError`` for malformed strings. + """ + dt = datetime.fromisoformat(schedule) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) diff --git a/scheduler/local.py b/scheduler/local.py new file mode 100644 index 0000000..37187cc --- /dev/null +++ b/scheduler/local.py @@ -0,0 +1,432 @@ +"""LocalScheduler — bundled sqlite + asyncio backend. + +The default scheduler when no protoWorkstacean install is configured. +Every protoAgent instance gets a private ``jobs.db`` namespaced by +``AGENT_NAME`` so spinning up gina-personal alongside gina-work +doesn't cross-fire prompts. + +Architecture: + +- One ``jobs`` table — ``id``, ``prompt``, ``schedule``, ``next_fire``, + ``agent_name``, ``last_fire``, ``enabled``, ``created_at``. +- Polling coroutine runs on FastAPI's startup hook (``server.py``) + and ticks once per ``_POLL_INTERVAL_S`` (1s default). Cheap because + sqlite reads with an indexed ``next_fire`` filter cost microseconds. +- Firing = HTTP POST to the running agent's own ``/a2a`` endpoint as + a ``message/send``. Going through HTTP rather than calling into the + graph directly gets us free parity with real callers — same audit + log, same cost-v1 capture, same auth path. +- One-shot ISO schedules are deleted after firing. Cron schedules + reschedule via croniter. +- On startup: any job whose ``next_fire`` is in the past but within a + 24h window fires immediately (BFCL-style "missed fires" recovery, + matching Workstacean's behaviour). Older missed fires are + rescheduled forward without firing — better than waking the agent + to a flood of stale prompts after a long downtime. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import sqlite3 +import uuid +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + +from croniter import croniter + +from scheduler.interface import Job, is_cron, parse_iso_to_utc + +log = logging.getLogger(__name__) + +DEFAULT_DB_DIR = "/sandbox/scheduler" +_POLL_INTERVAL_S = 1.0 +_MISSED_FIRE_WINDOW_S = 24 * 60 * 60 # 24h — matches Workstacean + + +def _resolve_db_path(db_dir: str | Path | None, agent_name: str) -> Path: + """Pick a writable jobs.db path namespaced by agent name. + + ``agent_name`` is sanitized to a single path segment before being + appended — operators set it via env or YAML, but defence in depth + against a value like ``../etc/passwd`` or ``/tmp/elsewhere`` is + cheap and prevents an exotic typo from putting a sqlite file + outside the configured scheduler dir. + """ + safe_name = _safe_segment(agent_name) + raw = os.environ.get("SCHEDULER_DB_DIR") or db_dir or DEFAULT_DB_DIR + base = Path(str(raw)).expanduser() / safe_name + try: + base.mkdir(parents=True, exist_ok=True) + probe = base / ".write-probe" + probe.touch() + probe.unlink() + return base / "jobs.db" + except OSError: + fallback = Path.home() / ".protoagent" / "scheduler" / safe_name + fallback.mkdir(parents=True, exist_ok=True) + log.info("[scheduler] %s not writable; using %s instead", base, fallback) + return fallback / "jobs.db" + + +def _safe_segment(name: str) -> str: + """Reduce ``name`` to a single safe path segment. + + Replaces path separators, ``..``, and absolute-path prefixes with + underscores; falls back to ``"default"`` when nothing usable + remains. Preserves the common slug shape (``gina-personal``, + ``ginavision``) without surprises. + """ + if not name: + return "default" + cleaned = name.replace("/", "_").replace("\\", "_").replace("..", "_") + cleaned = cleaned.lstrip(".").strip() + return cleaned or "default" + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat() + + +def _compute_next_fire(schedule: str, *, after: datetime | None = None) -> str: + """Resolve a schedule string to the next ISO timestamp it fires. + + ``after`` controls when "next" starts — current time by default; + pass an explicit reference when rescheduling a cron job after a + fire so successive fires don't drift. + """ + after = after or datetime.now(UTC) + if is_cron(schedule): + return croniter(schedule, after).get_next(datetime).astimezone(UTC).isoformat() + return parse_iso_to_utc(schedule).isoformat() + + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + prompt TEXT NOT NULL, + schedule TEXT NOT NULL, + agent_name TEXT NOT NULL, + next_fire TEXT NOT NULL, + last_fire TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_jobs_next_fire ON jobs(next_fire); +CREATE INDEX IF NOT EXISTS idx_jobs_agent_name ON jobs(agent_name); +""" + + +class LocalScheduler: + """Sqlite-backed scheduler with an asyncio polling loop. + + Construct once at server startup, ``await scheduler.start()`` to + spawn the polling task, ``await scheduler.stop()`` on shutdown. + The agent-facing tools call ``add_job`` / ``cancel_job`` / + ``list_jobs`` synchronously. + """ + + name = "local" + + def __init__( + self, + agent_name: str, + *, + invoke_url: str, + api_key: str | None = None, + bearer_token: str | None = None, + db_dir: str | Path | None = None, + ): + self.agent_name = agent_name + self._invoke_url = invoke_url.rstrip("/") + self._api_key = api_key or "" + self._bearer = bearer_token or "" + self.path = _resolve_db_path(db_dir, agent_name) + self._task: asyncio.Task | None = None + self._stopping = False + self._init_db() + + # ── DB plumbing ───────────────────────────────────────────────────────── + + def _connect(self) -> sqlite3.Connection: + db = sqlite3.connect(str(self.path)) + db.row_factory = sqlite3.Row + try: + db.execute("PRAGMA journal_mode=WAL") + except sqlite3.OperationalError as exc: + log.debug("[scheduler] WAL skipped: %s", exc) + return db + + def _init_db(self) -> None: + try: + db = self._connect() + db.executescript(_SCHEMA) + db.commit() + db.close() + except sqlite3.DatabaseError: + log.exception("[scheduler] schema init failed at %s", self.path) + + # ── public API (matches SchedulerBackend) ─────────────────────────────── + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + if not prompt or not prompt.strip(): + raise ValueError("scheduler: prompt is required") + next_fire = _compute_next_fire(schedule) # raises ValueError for malformed input + + job = Job( + id=job_id or self._generate_id(), + prompt=prompt, + schedule=schedule, + agent_name=self.agent_name, + next_fire=next_fire, + ) + db = self._connect() + try: + db.execute( + "INSERT INTO jobs (id, prompt, schedule, agent_name, next_fire, " + "last_fire, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (job.id, job.prompt, job.schedule, job.agent_name, + job.next_fire, job.last_fire, int(job.enabled), job.created_at), + ) + db.commit() + except sqlite3.IntegrityError as exc: + raise ValueError(f"job id {job.id!r} already exists") from exc + finally: + db.close() + return job + + def cancel_job(self, job_id: str) -> bool: + db = self._connect() + try: + cur = db.execute( + "DELETE FROM jobs WHERE id = ? AND agent_name = ?", + (job_id, self.agent_name), + ) + db.commit() + return cur.rowcount > 0 + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] cancel_job failed: %s", exc) + return False + finally: + db.close() + + def list_jobs(self) -> list[Job]: + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? ORDER BY next_fire ASC", + (self.agent_name,), + ).fetchall() + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] list_jobs failed: %s", exc) + return [] + finally: + db.close() + return [_row_to_job(r) for r in rows] + + async def start(self) -> None: + if self._task is not None: + return + self._stopping = False + self._recover_missed_fires() + self._task = asyncio.create_task(self._poll_loop(), name="scheduler.local.poll") + log.info( + "[scheduler] local backend started: agent=%s db=%s", + self.agent_name, self.path, + ) + + async def stop(self) -> None: + self._stopping = True + if self._task is None: + return + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + # Expected — we just cancelled it. + pass + except Exception: # noqa: BLE001 + # Anything else means the polling loop crashed during + # shutdown. Log with traceback so we can debug; don't + # re-raise (caller is in shutdown path, raising would + # mask the original shutdown trigger). + log.exception("[scheduler] polling task raised during stop") + self._task = None + log.info("[scheduler] local backend stopped") + + # ── polling + firing ──────────────────────────────────────────────────── + + async def _poll_loop(self) -> None: + while not self._stopping: + try: + await self._tick() + except Exception: # noqa: BLE001 + log.exception("[scheduler] poll tick failed") + try: + await asyncio.sleep(_POLL_INTERVAL_S) + except asyncio.CancelledError: + return + + async def _tick(self) -> None: + now = datetime.now(UTC) + due = self._claim_due_jobs(now) + for job in due: + # Reschedule (or delete) only when delivery actually + # succeeded. A transient HTTP failure leaves the row in + # place so the next tick retries; a one-shot stays alive + # until it lands rather than vanishing on the first + # network blip. + if await self._fire(job): + self._reschedule_or_delete(job, fired_at=now) + else: + log.warning( + "[scheduler] fire failed for job %s; leaving in place for retry", + job.id, + ) + + def _claim_due_jobs(self, now: datetime) -> list[Job]: + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? AND enabled = 1 " + "AND next_fire <= ? ORDER BY next_fire ASC", + (self.agent_name, now.isoformat()), + ).fetchall() + except sqlite3.DatabaseError as exc: + log.warning("[scheduler] _claim_due_jobs failed: %s", exc) + return [] + finally: + db.close() + return [_row_to_job(r) for r in rows] + + def _reschedule_or_delete(self, job: Job, *, fired_at: datetime) -> None: + """Cron jobs roll forward; one-shot jobs are deleted.""" + db = self._connect() + try: + if is_cron(job.schedule): + next_iso = _compute_next_fire(job.schedule, after=fired_at) + db.execute( + "UPDATE jobs SET next_fire = ?, last_fire = ? WHERE id = ?", + (next_iso, fired_at.isoformat(), job.id), + ) + else: + db.execute("DELETE FROM jobs WHERE id = ?", (job.id,)) + db.commit() + except sqlite3.DatabaseError: + log.exception("[scheduler] reschedule failed for job %s", job.id) + finally: + db.close() + + def _recover_missed_fires(self) -> None: + """Roll past-due jobs forward on startup. + + - Missed fires within the last 24h fire immediately on the next + tick (we leave their ``next_fire`` in the past so the polling + loop picks them up naturally). + - Older missed fires are rescheduled forward without firing — + firing a flood of stale prompts after a long downtime is worse + than dropping them. + """ + cutoff_recent = datetime.now(UTC) - timedelta(seconds=_MISSED_FIRE_WINDOW_S) + db = self._connect() + try: + rows = db.execute( + "SELECT * FROM jobs WHERE agent_name = ? AND enabled = 1 " + "AND next_fire <= ?", + (self.agent_name, cutoff_recent.isoformat()), + ).fetchall() + for row in rows: + job = _row_to_job(row) + if is_cron(job.schedule): + next_iso = _compute_next_fire(job.schedule) + db.execute( + "UPDATE jobs SET next_fire = ? WHERE id = ?", + (next_iso, job.id), + ) + log.info( + "[scheduler] dropped stale fire for job %s; next at %s", + job.id, next_iso, + ) + else: + db.execute("DELETE FROM jobs WHERE id = ?", (job.id,)) + log.info("[scheduler] dropped stale one-shot job %s", job.id) + db.commit() + except sqlite3.DatabaseError: + log.exception("[scheduler] missed-fire recovery failed") + finally: + db.close() + + async def _fire(self, job: Job) -> bool: + """Deliver a job by POSTing to the agent's own A2A endpoint. + + Returns ``True`` on a 2xx response, ``False`` on any HTTP + error or network exception. Callers use the return value to + decide whether to advance the schedule (success) or leave + the row in place for the next tick to retry (failure). + """ + import httpx + + headers = {"Content-Type": "application/json"} + if self._bearer: + headers["Authorization"] = f"Bearer {self._bearer}" + if self._api_key: + headers["X-API-Key"] = self._api_key + + message_id = str(uuid.uuid4()) + body = { + "jsonrpc": "2.0", + "id": message_id, + "method": "message/send", + "params": { + "message": { + "role": "user", + "parts": [{"kind": "text", "text": job.prompt}], + "messageId": message_id, + }, + # Custom metadata goes at params.metadata — that's + # where a2a_handler._a2a_rpc reads it (see + # ``msg_metadata = params.get("metadata")``). Putting + # it inside params.message.metadata silently drops it. + "metadata": { + "scheduler_job_id": job.id, + "scheduler_kind": "local", + }, + }, + } + try: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post(f"{self._invoke_url}/a2a", headers=headers, json=body) + if r.status_code >= 400: + log.error( + "[scheduler] fire failed for job %s: HTTP %d %s", + job.id, r.status_code, r.text[:200], + ) + return False + log.info("[scheduler] fired job %s", job.id) + return True + except Exception: # noqa: BLE001 + log.exception("[scheduler] fire exception for job %s", job.id) + return False + + def _generate_id(self) -> str: + # Agent-name prefix keeps cross-agent IDs distinct in shared + # observability surfaces (audit log, dashboards) even though + # the DB row is already namespaced by agent_name. + return f"{self.agent_name}-{uuid.uuid4().hex[:12]}" + + +def _row_to_job(row: Any) -> Job: + return Job( + id=row["id"], + prompt=row["prompt"], + schedule=row["schedule"], + agent_name=row["agent_name"], + next_fire=row["next_fire"], + last_fire=row["last_fire"], + enabled=bool(row["enabled"]), + created_at=row["created_at"], + ) diff --git a/scheduler/workstacean.py b/scheduler/workstacean.py new file mode 100644 index 0000000..56df684 --- /dev/null +++ b/scheduler/workstacean.py @@ -0,0 +1,183 @@ +"""WorkstaceanScheduler — HTTP adapter to a protoWorkstacean install. + +Activated automatically when ``WORKSTACEAN_API_BASE`` and +``WORKSTACEAN_API_KEY`` are set (see ``server.py``). + +Speaks Workstacean's ``POST /publish`` API as documented at +https://protolabsai.github.io/protoWorkstacean/reference/scheduler/. +Every job is namespaced with the agent's name so multiple protoAgent +forks (e.g. ``gina-personal`` + ``gina-work``) can share one +Workstacean install without cross-firing: + +- Job IDs are prefixed: ``{agent_name}-{user_id_or_uuid}`` +- Topics are namespaced: ``cron.{agent_name}`` + +The adapter is fire-and-forget — Workstacean owns scheduling state. +``list_jobs()`` returns an empty list because Workstacean's list +action publishes asynchronously — strict local introspection requires +the local backend. + +Note: Workstacean today does not natively dispatch to A2A endpoints; +forks need to wire their Workstacean install to route ``cron.*`` +topics to the agent's A2A endpoint. See the linked guide for the +recommended bridge config. +""" + +from __future__ import annotations + +import logging +import os +import uuid +from typing import Any + +import httpx + +from scheduler.interface import Job, parse_iso_to_utc, is_cron + +log = logging.getLogger(__name__) + +DEFAULT_TIMEOUT_S = 10 + + +class WorkstaceanScheduler: + """HTTP adapter to a Workstacean ``/publish`` endpoint.""" + + name = "workstacean" + + def __init__( + self, + agent_name: str, + *, + base_url: str, + api_key: str, + topic_prefix: str | None = None, + timeout_s: float = DEFAULT_TIMEOUT_S, + ): + if not base_url: + raise ValueError("WorkstaceanScheduler: base_url is required") + if not api_key: + raise ValueError("WorkstaceanScheduler: api_key is required") + self.agent_name = agent_name + self._base_url = base_url.rstrip("/") + self._api_key = api_key + # Namespacing: topic_prefix governs which Workstacean topic the + # job fires on. Default = ``cron.``. Forks can override + # via ``WORKSTACEAN_TOPIC_PREFIX`` to integrate with existing + # bus conventions. + self._topic_prefix = topic_prefix or f"cron.{agent_name}" + self._timeout_s = timeout_s + + # ── public API ────────────────────────────────────────────────────────── + + def add_job(self, prompt: str, schedule: str, *, job_id: str | None = None) -> Job: + if not prompt or not prompt.strip(): + raise ValueError("scheduler: prompt is required") + # Validate the schedule eagerly so a malformed expr fails at + # tool-call time, not silently inside Workstacean. + _validate_schedule(schedule) + + normalized_id = self._namespaced_id(job_id) + topic = f"{self._topic_prefix}.{normalized_id}" + # Workstacean expects an outer ``command.schedule`` topic and + # the inner ``payload`` carries both the trigger schedule and + # the actual message that will be fired. The inner ``topic`` + # is what Workstacean publishes to when the schedule fires — + # so it has to be something a downstream A2A bridge subscribes + # to. Default convention: ``cron..``. + body = { + "topic": "command.schedule", + "payload": { + "action": "add", + "id": normalized_id, + "schedule": schedule, + "topic": topic, + "payload": { + "content": prompt, + "sender": "scheduler", + "channel": "a2a", + # Cross-system breadcrumb so the bridge knows which + # protoAgent fork the message belongs to. + "agent_name": self.agent_name, + "scheduler_job_id": normalized_id, + }, + }, + } + self._publish(body) + + return Job( + id=normalized_id, + prompt=prompt, + schedule=schedule, + agent_name=self.agent_name, + next_fire=None, # Workstacean owns the schedule state + ) + + def cancel_job(self, job_id: str) -> bool: + body = { + "topic": "command.schedule", + "payload": {"action": "remove", "id": self._namespaced_id(job_id)}, + } + try: + self._publish(body) + return True + except RuntimeError as exc: + log.warning("[scheduler] workstacean cancel failed: %s", exc) + return False + + def list_jobs(self) -> list[Job]: + """Returns ``[]`` from the adapter. + + Workstacean's ``list`` action publishes its response on the + ``schedule.list`` topic — there is no synchronous reply on + ``/publish``. Subscribing to that topic from inside a + protoAgent process (without a full bus client) is more + machinery than this adapter is the right layer for. Forks + that need live introspection should run the local backend or + query Workstacean directly. + """ + return [] + + async def start(self) -> None: + # Workstacean owns scheduling state — nothing to start here. + log.info( + "[scheduler] workstacean backend ready: agent=%s base=%s topic=%s.*", + self.agent_name, self._base_url, self._topic_prefix, + ) + + async def stop(self) -> None: + return None + + # ── helpers ───────────────────────────────────────────────────────────── + + def _publish(self, body: dict[str, Any]) -> None: + headers = {"Content-Type": "application/json", "X-API-Key": self._api_key} + try: + r = httpx.post( + f"{self._base_url}/publish", + headers=headers, + json=body, + timeout=self._timeout_s, + ) + except httpx.HTTPError as exc: + raise RuntimeError(f"workstacean publish failed: {exc}") from exc + if r.status_code >= 400: + raise RuntimeError( + f"workstacean publish HTTP {r.status_code}: {r.text[:200]}" + ) + + def _namespaced_id(self, job_id: str | None) -> str: + suffix = job_id or uuid.uuid4().hex[:12] + prefix = f"{self.agent_name}-" + return suffix if suffix.startswith(prefix) else prefix + suffix + + +def _validate_schedule(schedule: str) -> None: + """Validate cron expression OR ISO datetime. Raises ValueError.""" + if is_cron(schedule): + from croniter import croniter + try: + croniter(schedule) + except (TypeError, ValueError) as exc: + raise ValueError(f"invalid cron expression {schedule!r}: {exc}") from exc + return + parse_iso_to_utc(schedule) # raises ValueError on malformed ISO diff --git a/server.py b/server.py index f1ecd46..8b10e4f 100644 --- a/server.py +++ b/server.py @@ -30,10 +30,13 @@ import os import time from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from graph.output_format import extract_output +if TYPE_CHECKING: + from scheduler.interface import SchedulerBackend + # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- @@ -58,6 +61,10 @@ _active_port = 7870 # populated by _main() — the port this process is actually bound to. # Read by the autostart installer so the LaunchAgent reboots # on the same port the operator launched with, not the default. +_scheduler = None # SchedulerBackend (LocalScheduler or WorkstaceanScheduler). + # Constructed at init, started on FastAPI startup, stopped + # on shutdown. Lifecycle is hooked in _main() so the + # polling coroutine doesn't leak on server reload. def _init_langgraph_agent(): @@ -97,11 +104,21 @@ def _init_langgraph_agent(): # the worker subagent — the store is still cheap to construct. knowledge_store = _build_knowledge_store(_graph_config) - _graph = create_agent_graph(_graph_config, knowledge_store=knowledge_store) + # Scheduler — local sqlite by default, swaps to a WorkstaceanScheduler + # automatically when WORKSTACEAN_API_BASE + WORKSTACEAN_API_KEY env + # vars are set. Both backends share the same agent-tool surface + # (schedule_task / list_schedules / cancel_schedule). + global _scheduler + _scheduler = _build_scheduler(_graph_config) + + _graph = create_agent_graph( + _graph_config, knowledge_store=knowledge_store, scheduler=_scheduler, + ) log.info( - "LangGraph agent initialized (model: %s, knowledge_db: %s)", + "LangGraph agent initialized (model: %s, knowledge_db: %s, scheduler: %s)", _graph_config.model_name, getattr(knowledge_store, "path", "(disabled)"), + getattr(_scheduler, "name", "disabled"), ) @@ -124,6 +141,119 @@ def _build_knowledge_store(config): return None +def _start_scheduler_async(backend: "SchedulerBackend") -> None: + """Fire-and-forget scheduler.start() onto the running loop. + + Reload paths are sync but invoked from FastAPI request handlers, + so the running loop is available. Awaiting would force the entire + reload chain to become async — not worth it for one no-await + coroutine. + """ + import asyncio + try: + asyncio.get_running_loop().create_task(backend.start()) + except RuntimeError: + log.warning( + "[reload] no running event loop; scheduler will start " + "on next process boot", + ) + except Exception: + log.exception("[reload] scheduler start failed") + + +def _stop_scheduler_async(backend: "SchedulerBackend") -> None: + """Fire-and-forget scheduler.stop() onto the running loop. + + Used when the YAML toggle flips off mid-reload. The polling task + cancels cleanly; the next graph rebuild registers no scheduler + tools. + """ + import asyncio + try: + asyncio.get_running_loop().create_task(backend.stop()) + except RuntimeError: + log.warning("[reload] no running event loop; scheduler not stopped") + except Exception: + log.exception("[reload] scheduler stop failed") + + +def _build_scheduler(config) -> "SchedulerBackend | None": + """Return the active scheduler backend, or ``None`` when disabled. + + Selection order: + + 1. ``WORKSTACEAN_API_BASE`` + ``WORKSTACEAN_API_KEY`` set → + ``WorkstaceanScheduler``. Forks running on the protoLabs fleet + infrastructure get this for free. + 2. Otherwise → ``LocalScheduler`` with sqlite at + ``/sandbox/scheduler//jobs.db``. + + Returns ``None`` when explicitly disabled via ``SCHEDULER_DISABLED=1`` + so a fork can ship without a scheduler at all. + + The agent's auth token + api-key are passed into the local backend + so its self-invocation HTTP call can pass through bearer / X-API-Key + auth — the scheduler hits the same A2A endpoint as a real caller. + """ + # Two opt-out paths, in priority order: + # 1. ``middleware.scheduler: false`` in YAML (drawer / wizard). + # This is the canonical opt-out — symmetric with + # ``middleware.knowledge`` / ``middleware.memory``. + # 2. ``SCHEDULER_DISABLED=1`` env var. Runtime escape hatch for + # fleet operators who need to kill the scheduler without + # editing config (e.g. emergency rollback). + if not getattr(config, "scheduler_enabled", True): + log.info("[server] scheduler disabled via middleware.scheduler config") + return None + if os.environ.get("SCHEDULER_DISABLED", "").lower() in ("1", "true", "yes"): + log.info("[server] scheduler disabled via SCHEDULER_DISABLED env") + return None + + name = agent_name() + workstacean_base = os.environ.get("WORKSTACEAN_API_BASE", "").strip() + workstacean_key = os.environ.get("WORKSTACEAN_API_KEY", "").strip() + if workstacean_base and workstacean_key: + try: + from scheduler import WorkstaceanScheduler + return WorkstaceanScheduler( + agent_name=name, + base_url=workstacean_base, + api_key=workstacean_key, + topic_prefix=os.environ.get("WORKSTACEAN_TOPIC_PREFIX") or None, + ) + except Exception as exc: + log.warning( + "[server] WorkstaceanScheduler init failed: %s; falling back to local", + exc, + ) + + try: + from scheduler import LocalScheduler + invoke_url = os.environ.get( + "SCHEDULER_INVOKE_URL", + f"http://127.0.0.1:{_active_port}", + ) + bearer = (config.auth_token or os.environ.get("A2A_AUTH_TOKEN", "")).strip() + # The A2A handler reads X-API-Key from ``_API_KEY`` + # (server.py L893 — note: the env-derived name, NOT the wizard-set + # ``identity.name``). Match that here so a wizard rename doesn't + # break self-invocation auth. + api_key_env = f"{AGENT_NAME_ENV.upper()}_API_KEY" + api_key = os.environ.get(api_key_env, "").strip() + return LocalScheduler( + agent_name=name, + invoke_url=invoke_url, + api_key=api_key, + bearer_token=bearer, + ) + except Exception as exc: + log.warning( + "[server] LocalScheduler init failed: %s; running scheduler-less", + exc, + ) + return None + + def _reload_langgraph_agent() -> tuple[bool, str]: """Rebuild the compiled graph from the latest config YAML. @@ -158,12 +288,44 @@ def _reload_langgraph_agent() -> tuple[bool, str]: # would leave the process serving the prior compiled _graph under # fresh _graph_config + rotated bearer auth on failure — the # metrics / card / auth all de-sync from what's actually running. + # Plan the scheduler swap *before* attempting the graph rebuild so + # the polling loop isn't torn down (or a fresh one started) until + # we know the rebuild will succeed. Three states: + # + # 1. Toggle flipped OFF, scheduler currently running → next graph + # uses None; we stop the running scheduler only after commit. + # 2. Toggle ON, none running (first-run after setup completes) → + # construct now (cheap), start only after commit. + # 3. Toggle ON, already running → reuse. Drawer saves don't tear + # down the polling loop. + # + # Env-driven config (WORKSTACEAN_API_BASE) only takes effect on + # full process restart; the YAML toggle is the canonical + # reload-time switch. + global _scheduler + scheduler_wanted = getattr(new_config, "scheduler_enabled", True) + next_scheduler: "SchedulerBackend | None" + pending_start: "SchedulerBackend | None" = None + pending_stop: "SchedulerBackend | None" = None + if not scheduler_wanted: + next_scheduler = None + pending_stop = _scheduler # may be None — stopper is no-op then + elif _scheduler is None: + next_scheduler = _build_scheduler(new_config) + pending_start = next_scheduler + else: + next_scheduler = _scheduler + if is_setup_complete(): try: new_store = _build_knowledge_store(new_config) - new_graph = create_agent_graph(new_config, knowledge_store=new_store) + new_graph = create_agent_graph( + new_config, knowledge_store=new_store, scheduler=next_scheduler, + ) except Exception as e: log.exception("[reload] graph rebuild failed") + # Scheduler state hasn't been committed yet — caller's + # running scheduler keeps polling, no orphaned tasks. return False, f"graph rebuild failed: {e}" else: new_graph = None @@ -180,6 +342,15 @@ def _reload_langgraph_agent() -> tuple[bool, str]: # before _main wires routes) — harmless. pass _graph = new_graph + # Commit the scheduler swap. start/stop are async — fire-and-forget + # onto the active loop so reload stays sync. We've already verified + # the graph rebuild succeeded; if start/stop fails we log but + # don't roll back (the agent is already serving the new graph). + _scheduler = next_scheduler + if pending_stop is not None: + _stop_scheduler_async(pending_stop) + if pending_start is not None: + _start_scheduler_async(pending_start) if new_graph is None: log.info("[reload] setup not complete — config reloaded, graph not compiled") @@ -757,6 +928,31 @@ def _main(): fastapi_app = FastAPI(title=f"{agent_name()} — protoAgent") + # --- Scheduler lifecycle ------------------------------------------------ + # The local scheduler needs an asyncio polling task; the Workstacean + # adapter is a no-op start/stop. Both implement the same contract so + # we just call through. on_event is preferred over a lifespan + # context manager here — the rest of the boot is sync (uvicorn.run + # is the only blocking call) and FastAPI fires startup/shutdown + # around it. + @fastapi_app.on_event("startup") + async def _scheduler_startup() -> None: + if _scheduler is None: + return + try: + await _scheduler.start() + except Exception: + log.exception("[scheduler] startup failed") + + @fastapi_app.on_event("shutdown") + async def _scheduler_shutdown() -> None: + if _scheduler is None: + return + try: + await _scheduler.stop() + except Exception: + log.exception("[scheduler] shutdown failed") + # --- Chat API ----------------------------------------------------------- class ChatRequest(PydanticBaseModel): message: str diff --git a/tests/test_config_io.py b/tests/test_config_io.py index caf0bb2..946abfb 100644 --- a/tests/test_config_io.py +++ b/tests/test_config_io.py @@ -329,7 +329,17 @@ def test_list_available_tools_returns_starter_set(): assert "calculator" in names assert "web_search" in names assert "fetch_url" in names + # Memory + scheduler tools appear in the wizard checklist even + # when no store / scheduler has been constructed yet — otherwise + # the user couldn't enable them on a fresh boot. + assert "memory_ingest" in names + assert "schedule_task" in names + assert "list_schedules" in names + assert "cancel_schedule" in names assert all(isinstance(n, str) for n in names) + # No duplicates — list_available_tools dedupes between the + # backend-bound tools and the static name lists. + assert len(names) == len(set(names)) # ── Setup wizard marker ───────────────────────────────────────────────────── diff --git a/tests/test_scheduler_local.py b/tests/test_scheduler_local.py new file mode 100644 index 0000000..0e65ca1 --- /dev/null +++ b/tests/test_scheduler_local.py @@ -0,0 +1,370 @@ +"""Tests for ``scheduler.local.LocalScheduler``. + +The polling-loop firing path is covered by stubbing ``httpx.AsyncClient`` +so a unit test doesn't need a running A2A endpoint. Multi-agent +isolation, missed-fire recovery, and reschedule-vs-delete behaviour +all get explicit cases — they're the parts most likely to regress. +""" + +from __future__ import annotations + +import asyncio +import sqlite3 +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import pytest + +from scheduler.interface import is_cron, parse_iso_to_utc +from scheduler.local import LocalScheduler, _compute_next_fire + + +# ── helpers ───────────────────────────────────────────────────────────────── + + +def _make_scheduler(tmp_path: Path, agent: str = "gina-test") -> LocalScheduler: + return LocalScheduler( + agent_name=agent, + invoke_url="http://127.0.0.1:7870", + api_key="k", + bearer_token="b", + db_dir=tmp_path, + ) + + +# ── interface helpers ────────────────────────────────────────────────────── + + +class TestIsCron: + def test_cron_5_field(self): + assert is_cron("0 9 * * *") is True + + def test_cron_with_ranges(self): + assert is_cron("0 9 * * 1-5") is True + + def test_iso_with_t(self): + assert is_cron("2026-04-28T15:00:00") is False + + def test_iso_with_space(self): + assert is_cron("2026-04-28 15:00:00") is False + + def test_iso_with_offset(self): + assert is_cron("2026-04-28T15:00:00+00:00") is False + + def test_garbage(self): + assert is_cron("not a schedule") is False + assert is_cron("0 9 *") is False # 3 fields, not 5 + + def test_seven_fields_rejected(self): + # 7-field cron (with seconds + year) is not standard 5-field; + # the current detector accepts only exactly 5. + assert is_cron("0 0 12 * * MON 2026") is False + + +class TestParseIso: + def test_naive_treated_as_utc(self): + dt = parse_iso_to_utc("2026-04-28T15:00:00") + assert dt.tzinfo == UTC + assert dt.hour == 15 + + def test_offset_normalized(self): + dt = parse_iso_to_utc("2026-04-28T15:00:00-05:00") + assert dt.tzinfo == UTC + assert dt.hour == 20 # 15 EST → 20 UTC + + def test_malformed_raises(self): + with pytest.raises(ValueError, match=r"Invalid isoformat|could not convert"): + parse_iso_to_utc("not an iso string") + + +# ── add / list / cancel ───────────────────────────────────────────────────── + + +class TestAddJob: + def test_cron_job(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *") + assert job.agent_name == "gina-test" + assert job.prompt == "hi" + assert job.next_fire is not None + assert "T" in job.next_fire # ISO + + def test_iso_one_shot(self, tmp_path): + s = _make_scheduler(tmp_path) + future = "2099-01-01T00:00:00" + job = s.add_job("hi", future) + # Naive ISO should be normalized to UTC + assert job.next_fire.startswith("2099-01-01T00:00:00") + + def test_empty_prompt_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + with pytest.raises(ValueError, match=r"prompt is required"): + s.add_job(" ", "0 9 * * *") + + def test_malformed_schedule_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + with pytest.raises(ValueError, match=r"Invalid isoformat|could not convert"): + s.add_job("hi", "not-a-real-schedule") + + def test_user_id_preserved(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *", job_id="my-custom-id") + assert job.id == "my-custom-id" + + def test_duplicate_id_rejected(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="dup") + with pytest.raises(ValueError, match="already exists"): + s.add_job("again", "0 9 * * *", job_id="dup") + + def test_auto_id_has_agent_prefix(self, tmp_path): + s = _make_scheduler(tmp_path, agent="ginavision") + job = s.add_job("hi", "0 9 * * *") + assert job.id.startswith("ginavision-") + + +class TestListAndCancel: + def test_list_filters_by_agent(self, tmp_path): + gp = _make_scheduler(tmp_path, agent="gina-personal") + gw = _make_scheduler(tmp_path, agent="gina-work") + gp.add_job("p1", "0 9 * * *") + gp.add_job("p2", "0 10 * * *") + gw.add_job("w1", "0 9 * * *") + assert len(gp.list_jobs()) == 2 + assert len(gw.list_jobs()) == 1 + assert gp.list_jobs()[0].agent_name == "gina-personal" + + def test_cancel_returns_true_on_hit(self, tmp_path): + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *") + assert s.cancel_job(job.id) is True + assert s.list_jobs() == [] + + def test_cancel_returns_false_on_miss(self, tmp_path): + s = _make_scheduler(tmp_path) + assert s.cancel_job("does-not-exist") is False + + def test_cross_agent_cancel_blocked(self, tmp_path): + gp = _make_scheduler(tmp_path, agent="gina-personal") + gw = _make_scheduler(tmp_path, agent="gina-work") + gw_job = gw.add_job("w1", "0 9 * * *") + # gp tries to cancel gw's job — must fail silently (no row deleted) + assert gp.cancel_job(gw_job.id) is False + assert len(gw.list_jobs()) == 1 + + +# ── reschedule / delete behaviour ─────────────────────────────────────────── + + +class TestRescheduleOrDelete: + def test_one_shot_deleted_after_fire(self, tmp_path): + s = _make_scheduler(tmp_path) + # ISO in the past so _claim_due_jobs picks it up + past = (datetime.now(UTC) - timedelta(seconds=5)).isoformat() + s.add_job("hi", past, job_id="oneshot") + job = s.list_jobs()[0] + s._reschedule_or_delete(job, fired_at=datetime.now(UTC)) + assert s.list_jobs() == [] + + def test_cron_rescheduled_after_fire(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="cron") + job = s.list_jobs()[0] + # Fire at a fixed timestamp — 2026-04-28T10:00:00Z is one hour + # past the 09:00 cron slot, so the next fire must be exactly + # 2026-04-29T09:00:00Z. + fired_at = datetime(2026, 4, 28, 10, 0, 0, tzinfo=UTC) + s._reschedule_or_delete(job, fired_at=fired_at) + rescheduled = s.list_jobs()[0] + assert rescheduled.next_fire == "2026-04-29T09:00:00+00:00" + assert rescheduled.last_fire == fired_at.isoformat() + + +class TestMissedFireRecovery: + def test_stale_oneshot_dropped(self, tmp_path): + s = _make_scheduler(tmp_path) + # ISO from 2 days ago — outside the 24h window + stale = (datetime.now(UTC) - timedelta(days=2)).isoformat() + s.add_job("hi", stale, job_id="stale") + s._recover_missed_fires() + assert s.list_jobs() == [] + + def test_stale_cron_rolled_forward(self, tmp_path): + s = _make_scheduler(tmp_path) + s.add_job("hi", "0 9 * * *", job_id="cron-stale") + # Manually rewrite next_fire to 2 days ago (outside window) + db = sqlite3.connect(str(s.path)) + old = (datetime.now(UTC) - timedelta(days=2)).isoformat() + db.execute("UPDATE jobs SET next_fire = ? WHERE id = ?", (old, "cron-stale")) + db.commit() + db.close() + s._recover_missed_fires() + rolled = s.list_jobs()[0] + assert rolled.next_fire > datetime.now(UTC).isoformat() + + def test_recent_missed_fire_kept(self, tmp_path): + s = _make_scheduler(tmp_path) + # 5 minutes ago — inside the 24h window, should still fire + recent = (datetime.now(UTC) - timedelta(minutes=5)).isoformat() + s.add_job("hi", recent, job_id="recent") + s._recover_missed_fires() + # Job still exists with next_fire in the past — polling will fire it + jobs = s.list_jobs() + assert len(jobs) == 1 + assert jobs[0].next_fire < datetime.now(UTC).isoformat() + + +# ── compute_next_fire ─────────────────────────────────────────────────────── + + +class TestComputeNextFire: + def test_cron_returns_iso_utc(self): + result = _compute_next_fire("0 9 * * *") + # Parses cleanly as ISO + dt = datetime.fromisoformat(result) + assert dt.tzinfo is not None + + def test_cron_after_anchor(self): + anchor = datetime(2026, 4, 27, 8, 0, 0, tzinfo=UTC) + result = _compute_next_fire("0 9 * * *", after=anchor) + # 9am UTC on 2026-04-27 + dt = datetime.fromisoformat(result) + assert dt.year == 2026 and dt.month == 4 and dt.day == 27 and dt.hour == 9 + + def test_iso_passthrough(self): + result = _compute_next_fire("2026-12-25T00:00:00") + assert result.startswith("2026-12-25T00:00:00") + + +# ── start / stop loop ─────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_start_stop_idempotent(tmp_path): + s = _make_scheduler(tmp_path) + await s.start() + await s.start() # second call is a no-op, not an error + assert s._task is not None + await s.stop() + await s.stop() # second call is a no-op, not an error + assert s._task is None + + +@pytest.mark.asyncio +async def test_due_job_fires(tmp_path, monkeypatch): + """End-to-end: an ISO job in the past gets picked up and POSTs to /a2a.""" + s = _make_scheduler(tmp_path) + # Schedule for 1 second ago so the first tick claims it + past = (datetime.now(UTC) - timedelta(seconds=1)).isoformat() + s.add_job("FIRED-ME", past, job_id="firetest") + + fired: list[dict] = [] + + class _FakeResponse: + status_code = 200 + text = "ok" + + class _FakeClient: + def __init__(self, *_a, **_kw): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, url, headers=None, json=None): + fired.append({"url": url, "json": json}) + return _FakeResponse() + + import httpx + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + + await s.start() + # Give the polling loop one tick (poll interval is 1s) + await asyncio.sleep(1.5) + await s.stop() + + assert any("FIRED-ME" in str(c["json"]) for c in fired) + # One-shot was deleted after firing + assert s.list_jobs() == [] + + +@pytest.mark.asyncio +async def test_fire_failure_leaves_job_in_place(tmp_path, monkeypatch): + """A 5xx HTTP response from /a2a must NOT delete the job. + + Regression guard for the round-2 review finding: previously, + _tick() called _reschedule_or_delete in finally, which silently + consumed one-shot jobs on transient failures. Now the job stays + until delivery actually succeeds. + """ + s = _make_scheduler(tmp_path) + past = (datetime.now(UTC) - timedelta(seconds=1)).isoformat() + s.add_job("DURABLE", past, job_id="firetest") + + class _FakeResponse: + status_code = 503 + text = "service unavailable" + + class _FakeClient: + def __init__(self, *_a, **_kw): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, url, headers=None, json=None): + return _FakeResponse() + + import httpx + monkeypatch.setattr(httpx, "AsyncClient", _FakeClient) + + await s.start() + await asyncio.sleep(1.5) # one polling tick + await s.stop() + + # Job survives the failed fire, will be retried on the next tick. + assert len(s.list_jobs()) == 1 + assert s.list_jobs()[0].id == "firetest" + + +@pytest.mark.asyncio +async def test_fire_returns_bool(tmp_path, monkeypatch): + """``_fire`` is the success/failure signal feeding the + reschedule decision in ``_tick``. Lock the contract.""" + s = _make_scheduler(tmp_path) + job = s.add_job("hi", "0 9 * * *", job_id="x") + + class _OkResponse: + status_code = 200 + text = "ok" + + class _ErrResponse: + status_code = 500 + text = "boom" + + class _FakeClient: + def __init__(self, response): + self._response = response + + async def __aenter__(self): + return self + + async def __aexit__(self, *_a): + return False + + async def post(self, *_a, **_kw): + return self._response + + import httpx + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: _FakeClient(_OkResponse())) + assert await s._fire(job) is True + + monkeypatch.setattr(httpx, "AsyncClient", lambda **kw: _FakeClient(_ErrResponse())) + assert await s._fire(job) is False diff --git a/tests/test_scheduler_workstacean.py b/tests/test_scheduler_workstacean.py new file mode 100644 index 0000000..74fb485 --- /dev/null +++ b/tests/test_scheduler_workstacean.py @@ -0,0 +1,168 @@ +"""Tests for ``scheduler.workstacean.WorkstaceanScheduler``. + +We don't run a Workstacean instance — instead we monkeypatch +``httpx.post`` and assert that the adapter sends the right +``POST /publish`` body shape (action, namespaced id, namespaced topic, +auth header). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from scheduler.workstacean import WorkstaceanScheduler + + +class _FakeResponse: + def __init__(self, status: int = 200, body: str = "ok"): + self.status_code = status + self.text = body + + +class _Recorder: + def __init__(self): + self.calls: list[dict[str, Any]] = [] + self.response = _FakeResponse() + + def __call__(self, url, headers=None, json=None, timeout=None): + self.calls.append({"url": url, "headers": headers, "json": json}) + return self.response + + +@pytest.fixture +def recorder(monkeypatch): + rec = _Recorder() + import httpx + monkeypatch.setattr(httpx, "post", rec) + return rec + + +@pytest.fixture +def adapter(): + return WorkstaceanScheduler( + agent_name="gina-personal", + base_url="http://workstacean:3000", + api_key="test-key", + ) + + +# ── construction guards ──────────────────────────────────────────────────── + + +def test_missing_base_url_rejected(): + with pytest.raises(ValueError, match="base_url"): + WorkstaceanScheduler(agent_name="x", base_url="", api_key="k") + + +def test_missing_api_key_rejected(): + with pytest.raises(ValueError, match="api_key"): + WorkstaceanScheduler(agent_name="x", base_url="http://w:3000", api_key="") + + +# ── add_job ──────────────────────────────────────────────────────────────── + + +class TestAddJob: + def test_publishes_command_schedule(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + assert len(recorder.calls) == 1 + body = recorder.calls[0]["json"] + assert body["topic"] == "command.schedule" + assert body["payload"]["action"] == "add" + + def test_id_namespaced_with_agent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + body = recorder.calls[0]["json"] + assert body["payload"]["id"] == "gina-personal-daily" + + def test_id_idempotent_when_already_prefixed(self, adapter, recorder): + # If the caller passes an already-prefixed id, the adapter + # shouldn't double-prefix it. + adapter.add_job("hi", "0 9 * * *", job_id="gina-personal-already-set") + body = recorder.calls[0]["json"] + assert body["payload"]["id"] == "gina-personal-already-set" + + def test_topic_namespaced_with_agent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="daily") + body = recorder.calls[0]["json"] + assert body["payload"]["topic"].startswith("cron.gina-personal.") + + def test_inner_payload_carries_prompt(self, adapter, recorder): + adapter.add_job("the actual prompt", "0 9 * * *", job_id="x") + inner = recorder.calls[0]["json"]["payload"]["payload"] + assert inner["content"] == "the actual prompt" + assert inner["channel"] == "a2a" + assert inner["agent_name"] == "gina-personal" + + def test_iso_oneshot_accepted(self, adapter, recorder): + adapter.add_job("hi", "2099-01-01T00:00:00", job_id="x") + assert len(recorder.calls) == 1 + + def test_malformed_schedule_rejected(self, adapter): + with pytest.raises(ValueError, match="Invalid isoformat|could not convert"): + adapter.add_job("hi", "not-a-schedule", job_id="x") + + def test_empty_prompt_rejected(self, adapter): + with pytest.raises(ValueError, match="prompt"): + adapter.add_job(" ", "0 9 * * *", job_id="x") + + def test_auth_header_sent(self, adapter, recorder): + adapter.add_job("hi", "0 9 * * *", job_id="x") + assert recorder.calls[0]["headers"]["X-API-Key"] == "test-key" + + +# ── cancel_job ───────────────────────────────────────────────────────────── + + +class TestCancelJob: + def test_publishes_remove(self, adapter, recorder): + adapter.cancel_job("daily") + body = recorder.calls[0]["json"] + assert body["payload"]["action"] == "remove" + assert body["payload"]["id"] == "gina-personal-daily" + + def test_returns_true_on_success(self, adapter, recorder): + assert adapter.cancel_job("daily") is True + + def test_returns_false_on_http_error(self, adapter, recorder): + recorder.response = _FakeResponse(status=500, body="boom") + assert adapter.cancel_job("daily") is False + + +# ── topic prefix override ────────────────────────────────────────────────── + + +def test_custom_topic_prefix(monkeypatch): + rec = _Recorder() + import httpx + monkeypatch.setattr(httpx, "post", rec) + adapter = WorkstaceanScheduler( + agent_name="gina-personal", + base_url="http://w:3000", + api_key="k", + topic_prefix="myorg.bus.gina", + ) + adapter.add_job("hi", "0 9 * * *", job_id="x") + body = rec.calls[0]["json"] + assert body["payload"]["topic"].startswith("myorg.bus.gina.") + + +# ── list_jobs is intentionally empty ─────────────────────────────────────── + + +def test_list_jobs_returns_empty(adapter): + """Workstacean's ``list`` action publishes async to a topic; + the adapter doesn't subscribe, so list_jobs returns [].""" + assert adapter.list_jobs() == [] + + +# ── start/stop are no-ops ────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_start_stop_no_op(adapter): + # Should not raise + await adapter.start() + await adapter.stop() diff --git a/tools/lg_tools.py b/tools/lg_tools.py index 161ddcb..d42effb 100644 --- a/tools/lg_tools.py +++ b/tools/lg_tools.py @@ -40,6 +40,7 @@ from __future__ import annotations import ast +import asyncio import operator as _op from datetime import datetime from zoneinfo import ZoneInfo, ZoneInfoNotFoundError @@ -280,6 +281,18 @@ def _extract_text_from_html(content: bytes) -> str: _MEMORY_RECALL_MAX_K = 20 _MEMORY_LIST_MAX_LIMIT = 200 +# Stable list of scheduler tool names. Exposed as a module-level +# constant so ``graph/config_io.py::list_available_tools`` can show +# the wizard the right surface even when the runtime hasn't yet +# constructed a scheduler instance (e.g. fresh boot before setup is +# complete). Keep in sync with ``_build_scheduler_tools``. +SCHEDULER_TOOL_NAMES: tuple[str, ...] = ( + "schedule_task", "list_schedules", "cancel_schedule", +) +MEMORY_TOOL_NAMES: tuple[str, ...] = ( + "memory_ingest", "memory_recall", "memory_list", "memory_stats", "daily_log", +) + def _build_memory_tools(knowledge_store) -> list: """Bind memory tools to a ``KnowledgeStore``. Returns a list.""" @@ -383,18 +396,123 @@ async def daily_log(content: str) -> str: return [memory_ingest, memory_recall, memory_list, memory_stats, daily_log] +# ── scheduler tools ────────────────────────────────────────────────────────── +# +# Three tools that bind to either the local sqlite-backed scheduler or +# the Workstacean adapter — the agent loop sees one stable surface and +# never has to know which backend is wired up. +# +# Multi-agent safety: the underlying backend is constructed in +# ``server.py`` with the active ``AGENT_NAME`` baked in. add_job / +# list_jobs / cancel_job all filter by that name so two protoAgent +# instances on the same machine (or sharing one Workstacean install) +# never see each other's jobs. + + +def _build_scheduler_tools(scheduler) -> list: + """Bind scheduler tools to a ``SchedulerBackend``. Returns a list.""" + + @tool + async def schedule_task( + prompt: str, + when: str, + job_id: str | None = None, + ) -> str: + """Schedule a future task. The agent receives ``prompt`` as a + new turn when the schedule fires. + + Use this for anything the operator wants done later: reminders + ("remind me to follow up on the auth migration tomorrow at + 9am"), recurring sweeps ("every Monday morning, summarize last + week's logs"), one-off check-ins ("at 3pm today, ask whether + the deploy is healthy"). + + Args: + prompt: The text the agent should receive when the schedule + fires. Be self-contained — the agent has no memory of + this scheduling moment when the task fires. + when: Either a 5-field cron expression (``"0 9 * * 1-5"`` + = every weekday at 9am) or an ISO-8601 datetime + (``"2026-05-01T15:00:00"`` = once at 3pm UTC on May 1). + Compute exact times using ``current_time`` — the agent + cannot infer "now" from training data. + job_id: Optional human-readable id for the job. Auto- + generated if omitted; you'll need it later to cancel. + + Returns ``"Scheduled job next at ."`` on success, + an error string on malformed ``when`` or backend failure. + """ + try: + job = await asyncio.to_thread(scheduler.add_job, prompt, when, job_id=job_id) + except ValueError as exc: + return f"Error: {exc}" + except Exception as exc: # noqa: BLE001 + return f"Error: scheduler add_job failed: {exc}" + next_fire = job.next_fire or "(managed by remote scheduler)" + return f"Scheduled job {job.id} next at {next_fire}." + + @tool + async def list_schedules() -> str: + """List the current scheduled jobs for this agent. + + Returns one job per line with id, next-fire timestamp, and a + prompt preview. Returns ``"No scheduled jobs."`` when empty. + + Backends that delegate state to a remote scheduler (e.g. the + Workstacean adapter) may return an empty list even when jobs + exist — query the remote scheduler directly to see those. + """ + jobs = await asyncio.to_thread(scheduler.list_jobs) + if not jobs: + return "No scheduled jobs." + lines = [] + for j in jobs: + preview = (j.prompt or "")[:80] + next_fire = j.next_fire or "(managed remotely)" + lines.append(f"{j.id} next={next_fire} schedule={j.schedule!r} {preview}") + return "\n".join(lines) + + @tool + async def cancel_schedule(job_id: str) -> str: + """Cancel a scheduled job by id. + + Args: + job_id: The id returned by ``schedule_task`` (or shown by + ``list_schedules``). + + Returns ``"Canceled ."`` or ``"Error: no such job ."``. + """ + if not job_id or not job_id.strip(): + return "Error: job_id is required." + try: + ok = await asyncio.to_thread(scheduler.cancel_job, job_id) + except Exception as exc: # noqa: BLE001 + return f"Error: scheduler cancel_job failed: {exc}" + return f"Canceled {job_id}." if ok else f"Error: cancel failed or no such job {job_id}." + + return [schedule_task, list_schedules, cancel_schedule] + + # ── registry ───────────────────────────────────────────────────────────────── -def get_all_tools(knowledge_store=None): +def get_all_tools(knowledge_store=None, scheduler=None): """Return every LangChain tool the lead agent + subagents can use. - When ``knowledge_store`` is provided, the memory tools are bound - to it and included. Forks that disable the store can pass - ``knowledge_store=None`` and the lead agent runs with the four - keyless tools only. + Optional dependencies: + + - ``knowledge_store`` enables the memory tools (memory_ingest, + memory_recall, memory_list, memory_stats, daily_log). + - ``scheduler`` enables the scheduler tools (schedule_task, + list_schedules, cancel_schedule). Accepts any backend that + implements ``scheduler.interface.SchedulerBackend``. + + Pass ``None`` to disable either subsystem — the lead agent runs + fine with just the four keyless general tools. """ tools = [current_time, calculator, web_search, fetch_url] if knowledge_store is not None: tools.extend(_build_memory_tools(knowledge_store)) + if scheduler is not None: + tools.extend(_build_scheduler_tools(scheduler)) return tools