From e2a54efda4c9e4b18164f5ba2e0d5d6a1958b1ac Mon Sep 17 00:00:00 2001 From: Will Drach Date: Fri, 22 May 2026 22:54:08 -0600 Subject: [PATCH 1/5] Generalize Slack messaging into a MessagingPlatform interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move every Slack-specific bit of the API into a new platforms/ package so additional messaging integrations (Discord, etc.) can be added by implementing a single adapter instead of branching on platform strings across agent.py, runtime_control.py, tool_manager.py, workflow_engine.py, and the slack_thread_turn workflow. New api/platforms/ package: - MessagingPlatform base class with no-op defaults for sanitization, identity recovery, system-prompt injection, live-streaming session lifecycle, outbound channel posts, and active-thread tool-call capture. - SlackPlatform owns all existing Slack behaviour: text scrubbing (sanitize_for_slack), slackbot HTTP client, GitHub-from-Slack-profile identity extraction, Slack formatting rules, live session + harness events + assistant status, and slack.send_message active-thread capture. - DevPlatform: no-op fallback used when delivery is missing/unknown. - Registry + resolvers: resolve_platform(name), resolve_for_delivery, resolve_for_thread_key, resolve_for_metadata. Production-code rewires: - agent.py: _build_session_context and _resolve_requester_identity dispatch via resolve_platform(); removed inline Slack-formatting and GitHub-handle helpers (now on SlackPlatform). - runtime_control.py: execution worker resolves platform once from delivery and calls platform.session_done / harness_event / assistant_status; dropped dead _send_slackbot_canonical_event and _canonical_text_blocks helpers. - tool_manager.py: _capture_live_slack_send → generic _capture_active_thread_tool_call dispatching to the platform resolved from thread_key prefix. - workflow_engine.py: Delivery.for_channel(platform, ...) added (with Delivery.slack as a thin wrapper); ctx.post_to_channel(platform, ...) added (with ctx.post_to_slack wrapping it); auto-handler loader now honors both SLACK_CHANNEL (legacy) and DELIVERY = {platform, channel} (forward-compat); schedule loader honors a delivery field. - workflows/slack_thread_turn.py: canonical WORKFLOW_NAME is now messaging_thread_turn with WORKFLOW_ALIASES = ("slack_thread_turn",) so the slackbot keeps working unchanged; _load_workflow_file registers aliases, _workflow_request_hash treats both names identically. - routers/agent.py: one _resolve_delivery_fallback helper for spawn + execute; logs a deprecation note when callers rely on the implicit default platform. Back-compat shims: - api/slack_sanitize.py and api/slackbot_client.py re-export the public surface from api.platforms.slack so external imports keep working. Tests: - New tests/test_platforms.py covers registration, resolvers, sanitizer, identity-line + formatting-rule generation, and Slack delivery predicates. - Existing tests that patched api.workflow_engine.slackbot_client.X / api.runtime_control.slackbot_client.X now patch the platform method directly via patch.object(SLACK_PLATFORM, ...). - test_slackbot_client.py retargeted to patch api.platforms.slack.{asyncio,httpx} since the HTTP client lives there. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/api/api/agent.py | 199 +---- services/api/api/platforms/__init__.py | 250 +++++++ services/api/api/platforms/dev.py | 13 + services/api/api/platforms/slack.py | 679 ++++++++++++++++++ services/api/api/routers/agent.py | 59 +- services/api/api/runtime_control.py | 94 +-- services/api/api/slack_sanitize.py | 77 +- services/api/api/slackbot_client.py | 268 ++----- services/api/api/tool_manager.py | 88 +-- services/api/api/workflow_engine.py | 177 +++-- .../api/api/workflows/slack_thread_turn.py | 15 +- .../api/tests/test_agent_control_plane.py | 17 +- services/api/tests/test_platforms.py | 119 +++ services/api/tests/test_slackbot_client.py | 10 +- .../tests/test_slackbot_client_sanitize.py | 7 +- .../tests/test_workflow_idempotency_unit.py | 4 +- services/api/tests/test_workflows.py | 24 +- 17 files changed, 1414 insertions(+), 686 deletions(-) create mode 100644 services/api/api/platforms/__init__.py create mode 100644 services/api/api/platforms/dev.py create mode 100644 services/api/api/platforms/slack.py create mode 100644 services/api/tests/test_platforms.py diff --git a/services/api/api/agent.py b/services/api/api/agent.py index 356737c7..7dc40fa3 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -18,7 +18,6 @@ import contextlib import json import os -import re import time import uuid from collections.abc import AsyncIterator @@ -41,15 +40,12 @@ log = structlog.get_logger() -_GITHUB_HANDLE_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?$") -_GITHUB_URL_RE = re.compile( - r"(?:https?://)?github\.com/([A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?)", - re.IGNORECASE, -) -_GITHUB_LABEL_RE = re.compile(r"\bgithub\b", re.IGNORECASE) -_GITHUB_PREFIX_RE = re.compile( - r"\bgithub\b\s*(?:username|user|handle|profile)?\s*[:/@-]?\s*@?([A-Za-z0-9][A-Za-z0-9-]{0,38})", - re.IGNORECASE, +# Slack profile → GitHub handle extraction lives on the SlackPlatform; we +# keep the legacy module-level name as a re-export so the existing test +# (`from api.agent import _extract_github_handle_from_slack_profile`) and +# any other call sites keep working until they migrate. +from api.platforms.slack import ( # noqa: E402, F401 + extract_github_handle_from_slack_profile as _extract_github_handle_from_slack_profile, ) _VALID_STDOUT_EVENT_TYPES = frozenset( @@ -562,130 +558,21 @@ async def _get_latest_thread_user_id(thread_key: str) -> str | None: return str(user_id).strip() or None -def _valid_github_handle(value: str) -> str | None: - candidate = value.strip().strip("@").strip() - candidate = candidate.rstrip("/").split("/", 1)[0] - return candidate if _GITHUB_HANDLE_RE.match(candidate) else None - - -def _extract_github_handle_from_slack_profile( - profile: dict[str, Any], -) -> tuple[str | None, str | None, str]: - """Return (handle, source, unavailable_reason) from Slack profile fields.""" - custom_fields = profile.get("custom_fields") - if not isinstance(custom_fields, dict) or not custom_fields: - return None, None, "no GitHub custom field found on Slack profile" - - saw_github_field = False - for label, raw_value in custom_fields.items(): - label_text = str(label or "").strip() - value = str(raw_value or "").strip() - if not value: - continue - - label_mentions_github = bool(_GITHUB_LABEL_RE.search(label_text)) - value_mentions_github = bool(_GITHUB_LABEL_RE.search(value)) - if not label_mentions_github and not value_mentions_github: - continue - saw_github_field = True - - source = ( - f'Slack profile custom field "{label_text}"' - if label_text - else "Slack profile custom field" - ) - url_match = _GITHUB_URL_RE.search(value) - if url_match: - handle = _valid_github_handle(url_match.group(1)) - if handle: - return f"@{handle}", source, "" - - prefixed_match = _GITHUB_PREFIX_RE.search(value) - if prefixed_match: - handle = _valid_github_handle(prefixed_match.group(1)) - if handle: - return f"@{handle}", source, "" - - if label_mentions_github: - handle = _valid_github_handle(value) - if handle: - return f"@{handle}", source, "" - - if saw_github_field: - return ( - None, - None, - "GitHub profile field did not contain a valid GitHub handle", - ) - return None, None, "no GitHub custom field found on Slack profile" - - async def _resolve_requester_identity( *, platform: str | None, user_id: str | None, ) -> dict[str, str | bool] | None: - if not user_id or (platform or "").lower() != "slack": - return None - - identity: dict[str, str | bool] = { - "slack_user_id": user_id, - "slack_mention": f"<@{user_id}>", - } - try: - from api.app import get_tool_manager - - profile = await get_tool_manager().call_tool_raw( - "slack", "get_user_profile", {"user_id": user_id} - ) - except Exception as exc: - log.warning( - "requester_identity_lookup_failed", - platform=platform, - user_id=user_id, - error=str(exc), - ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } - ) - return identity + """Delegate to the registered messaging platform. - if not isinstance(profile, dict) or profile.get("error"): - error = str(profile.get("error") or "Slack profile could not be fetched") - log.warning( - "requester_identity_lookup_failed", - platform=platform, - user_id=user_id, - error=error, - ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } - ) - return identity + Slack pulls GitHub handles from profile custom fields; other platforms + return ``None`` until their adapter implements identity recovery. + """ + from api.platforms import resolve_platform - handle, source, reason = _extract_github_handle_from_slack_profile(profile) - if handle: - identity.update( - { - "github_handle": handle, - "github_handle_source": source or "Slack profile custom field", - "github_handle_verified": True, - } - ) - else: - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": reason, - } - ) - return identity + if not user_id: + return None + return await resolve_platform(platform).load_requester_identity(user_id) async def _insert_system_message( @@ -933,11 +820,13 @@ def _build_session_context( ) -> str: """Build session context to append to the system prompt. - Contains metadata (time, thread, platform) and platform-specific formatting - rules so the agent produces output suitable for the target platform. + Contains metadata (time, thread, platform) plus identity + formatting + rules contributed by the registered messaging platform. """ from datetime import datetime, timezone + from api.platforms import resolve_platform + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") lines = [ "# Session Context", @@ -948,55 +837,9 @@ def _build_session_context( if platform: lines.append(f"- **Platform**: {platform}") - if requester_identity: - lines.extend( - [ - "", - "## Requester Identity", - "", - f"- Slack user ID: {requester_identity['slack_user_id']}", - f"- Slack mention: {requester_identity['slack_mention']}", - ] - ) - if requester_identity.get("github_handle_verified"): - lines.extend( - [ - "- GitHub handle from Slack profile: " - f"{requester_identity['github_handle']}", - "- GitHub handle source: " - f"{requester_identity['github_handle_source']}", - "- GitHub handle verified: yes", - ] - ) - else: - lines.extend( - [ - "- GitHub handle from Slack profile: unavailable", - "- GitHub handle unavailable reason: " - f"{requester_identity['github_handle_unavailable_reason']}", - "- GitHub handle verified: no", - ] - ) - - if platform and platform.lower() == "slack": - lines.extend( - [ - "", - "## Slack Formatting Rules", - "", - "- Use standard markdown links `[Display Text](URL)` for hyperlinks", - "- Do NOT use Slack-native `` link syntax", - "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", - "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", - "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", - "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", - "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", - ] - ) - if user_id: - lines.append( - f"- After completing a long task, tag the requester with their real Slack mention: <@{user_id}>" - ) + platform_impl = resolve_platform(platform) + lines.extend(platform_impl.system_prompt_identity_lines(requester_identity)) + lines.extend(platform_impl.system_prompt_rules(user_id=user_id)) lines.extend(["", "---", ""]) return "\n".join(lines) diff --git a/services/api/api/platforms/__init__.py b/services/api/api/platforms/__init__.py new file mode 100644 index 00000000..ff8a16ea --- /dev/null +++ b/services/api/api/platforms/__init__.py @@ -0,0 +1,250 @@ +"""Messaging-platform abstraction layer. + +A ``MessagingPlatform`` owns everything that varies between the platforms +Centaur can deliver agent results to: text scrubbing, identity recovery, +system-prompt injection, live-streaming session lifecycle, thread-key +parsing, and active-thread tool-call capture. + +One singleton per platform is registered in ``PLATFORMS``. Callers resolve +a platform by name (``resolve_platform("slack")``) or by inspecting the +delivery dict on an execution (``resolve_for_delivery(delivery)``). The +default fallback when the platform is missing or unknown is ``"dev"``, +which is a no-op implementation suitable for unit tests and the localhost +bypass path. +""" + +from __future__ import annotations + +import json +import re +from typing import Any + +import structlog + +log = structlog.get_logger() + + +class MessagingPlatform: + """Base implementation. Subclasses override what the platform supports. + + Every method has a working default so unrelated platforms (and tests + that pass arbitrary platform names) keep working without raising. + """ + + name: str = "" + + # Text truncation limits used by the live-streaming projection. + message_chunk_chars: int = 12_000 + step_chunk_chars: int = 12_000 + + # Regex that matches a leading ``<@USER_ID>`` mention prefix and captures + # the remaining text. Used by recovery-command normalisation in the + # messaging_thread_turn workflow. Default never matches. + mention_prefix_re: re.Pattern[str] = re.compile(r"(?!x)x") + + # ── Delivery predicates ──────────────────────────────────────────── + + def is_delivery_for_me(self, delivery: dict[str, Any] | None) -> bool: + return ( + isinstance(delivery, dict) + and str(delivery.get("platform") or "") == self.name + ) + + # ── Text scrubbing ───────────────────────────────────────────────── + + def sanitize_text( + self, text: str | None, *, preserve_edges: bool = False + ) -> str: + return text or "" + + def sanitize_event(self, value: Any) -> Any: + return value + + def clip_text(self, value: Any, max_chars: int | None = None) -> str: + text = ( + value + if isinstance(value, str) + else json.dumps(value, ensure_ascii=False, default=str) + ) + text = text.strip() + limit = max_chars or self.step_chunk_chars + return text if len(text) <= limit else f"{text[: limit - 1]}…" + + # ── Thread-key parsing ───────────────────────────────────────────── + + def thread_key_destination(self, thread_key: str) -> str | None: + """Return the channel/destination ID encoded in the thread_key, or None.""" + return None + + # ── Identity recovery ────────────────────────────────────────────── + + async def load_requester_identity( + self, user_id: str | None + ) -> dict[str, str | bool] | None: + return None + + def system_prompt_identity_lines( + self, identity: dict[str, str | bool] | None + ) -> list[str]: + return [] + + def system_prompt_rules(self, *, user_id: str | None = None) -> list[str]: + return [] + + # ── Live-streaming session lifecycle ─────────────────────────────── + + async def open_live_session( + self, + *, + delivery: dict[str, Any], + metadata: dict[str, Any], + thread_key: str, + title: str = "Centaur execution", + header: str | None = None, + ) -> str | None: + return None + + async def session_text( + self, session_id: str | None, markdown: str + ) -> None: + return None + + async def session_step( + self, + session_id: str | None, + *, + step_id: str, + title: str, + status: str = "in_progress", + details: str | None = None, + output: str | None = None, + ) -> None: + return None + + async def session_done( + self, session_id: str | None, thread_id: str | None = None + ) -> None: + return None + + async def harness_event( + self, session_id: str | None, event: dict[str, Any] + ) -> dict[str, Any] | None: + return None + + async def assistant_status( + self, delivery: dict[str, Any], status: str + ) -> None: + return None + + # ── Channel post (workflow ctx.post_to_channel) ──────────────────── + + async def send_channel_message( + self, + channel: str, + text: str, + *, + thread_id: str | None = None, + ) -> dict[str, Any]: + """Post a message to a channel via this platform's outbound tool. + + Default raises so workflows fail loudly when they target a platform + whose adapter doesn't implement outbound channel posts. Subclasses + override to call the right tool (Slack → ``slack.send_message``, + Discord → ``discord.send_message``, …). + """ + raise NotImplementedError( + f"platform {self.name!r} does not implement send_channel_message" + ) + + # ── Live tool-call capture ───────────────────────────────────────── + + async def capture_active_thread_tool_call( + self, + *, + request: Any, + sandbox_claims: dict[str, Any] | None, + tool_name: str, + method_name: str, + args: dict[str, Any], + ) -> dict[str, Any] | None: + """If an agent tool call should be re-routed through the live session + instead of hitting the platform's API, return a captured-result dict. + + Slack catches ``slack.send_message`` to the active thread; other + platforms return ``None`` by default. + """ + return None + + +# ── Registry ─────────────────────────────────────────────────────────── + +PLATFORMS: dict[str, MessagingPlatform] = {} + + +def register_platform(platform: MessagingPlatform) -> None: + if not platform.name: + raise ValueError("platform.name must be set before registration") + PLATFORMS[platform.name] = platform + + +def resolve_platform(name: str | None) -> MessagingPlatform: + """Return the registered platform, falling back to ``dev`` (no-op). + + Unknown platform names log a debug message and return the dev platform + so callers don't need to special-case ``None``/unknown values. + """ + if name and name in PLATFORMS: + return PLATFORMS[name] + if name: + log.debug("resolve_platform_unknown", requested=name) + return PLATFORMS.get("dev") or _DEV_FALLBACK + + +def resolve_for_delivery( + delivery: dict[str, Any] | None, +) -> MessagingPlatform: + name: str | None = None + if isinstance(delivery, dict): + value = delivery.get("platform") + if isinstance(value, str) and value: + name = value + return resolve_platform(name) + + +def resolve_for_thread_key(thread_key: str | None) -> MessagingPlatform | None: + """Resolve a platform from the leading namespace of a thread_key + (e.g. ``"slack:C123:..."`` → SlackPlatform). Returns None when the + prefix doesn't match any registered platform. + """ + if not thread_key or ":" not in thread_key: + return None + prefix = thread_key.split(":", 1)[0] + return PLATFORMS.get(prefix) + + +def resolve_for_metadata( + metadata: dict[str, Any] | None, +) -> MessagingPlatform | None: + """Resolve from metadata.platform if present; ``None`` otherwise. + + Unlike ``resolve_for_delivery``, this does not fall back to dev — it + distinguishes "no platform recorded" from "dev platform". + """ + if not isinstance(metadata, dict): + return None + value = metadata.get("platform") + if isinstance(value, str) and value and value in PLATFORMS: + return PLATFORMS[value] + return None + + +# Defensive fallback used if resolve_platform runs before registration. +_DEV_FALLBACK = MessagingPlatform() +_DEV_FALLBACK.name = "dev" + + +# Eager imports so registration happens on package import. Kept at the +# bottom to avoid circular-import issues with platforms that import from +# api.* at module load. +from api.platforms import dev as _dev_module # noqa: E402, F401 +from api.platforms import slack as _slack_module # noqa: E402, F401 diff --git a/services/api/api/platforms/dev.py b/services/api/api/platforms/dev.py new file mode 100644 index 00000000..41df024f --- /dev/null +++ b/services/api/api/platforms/dev.py @@ -0,0 +1,13 @@ +"""Dev platform: no-op everything. Used for localhost-bypass executions +and unit tests where no real messaging integration is wired up.""" + +from __future__ import annotations + +from api.platforms import MessagingPlatform, register_platform + + +class DevPlatform(MessagingPlatform): + name = "dev" + + +register_platform(DevPlatform()) diff --git a/services/api/api/platforms/slack.py b/services/api/api/platforms/slack.py new file mode 100644 index 00000000..ebc4a6d1 --- /dev/null +++ b/services/api/api/platforms/slack.py @@ -0,0 +1,679 @@ +"""Slack messaging platform: text scrubbing, identity recovery, +system-prompt injection, live-streaming session, and active-thread +``slack.send_message`` capture. + +Everything Slack-specific lives here. The thin shims at +``api/slack_sanitize.py`` and ``api/slackbot_client.py`` re-export the +module-level helpers so existing callsites keep working during the +Phase 0 → Phase 1 transition. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import re +from collections.abc import Callable +from typing import Any + +import httpx +import structlog + +from api.platforms import MessagingPlatform, register_platform + +log = structlog.get_logger() + +# ── Text scrubbing ──────────────────────────────────────────────────── + +_THREAD_TRAILER_RE = re.compile( + r"(?:^|\s)(?:Agent|Codex|Amp|Claude\s+Code|Pi)\s+thread\s+`?[0-9a-f-]{8,}`?(?:,\s*with\s+interactive\s+elements)?(?=\s*$|[.!?]\s*$)", + re.IGNORECASE | re.MULTILINE, +) +_EXECUTION_TRAILER_RE = re.compile( + r"\b(?:Execution|execution_id)\s*[:=]\s*`?exe_[0-9a-f]{16}`?", + re.IGNORECASE, +) +_CURL_EXIT_RE = re.compile(r"curl:?\s*\((\d+)\):?\s*[^\n]{0,200}", re.IGNORECASE) + +_TEXT_KEYS = { + "content", + "delta", + "details", + "error", + "message", + "output", + "result", + "summary", + "text", + "title", +} + + +def _replace_matching_json_objects( + text: str, + predicate: Callable[[dict[str, Any]], bool], + replacement: str, +) -> str: + decoder = json.JSONDecoder() + out: list[str] = [] + index = 0 + while index < len(text): + if text[index] != "{": + out.append(text[index]) + index += 1 + continue + try: + value, end = decoder.raw_decode(text[index:]) + except ValueError: + out.append(text[index]) + index += 1 + continue + if isinstance(value, dict) and predicate(value): + out.append(replacement) + index += end + continue + out.append(text[index]) + index += 1 + return "".join(out) + + +def _is_k8s_status(value: dict[str, Any]) -> bool: + return value.get("kind") == "Status" and any( + key in value for key in ("status", "reason", "code", "message") + ) + + +def _is_tool_error_envelope(value: dict[str, Any]) -> bool: + return "error_type" in value and any( + key in value for key in ("detail", "error", "status_code") + ) + + +def sanitize_for_slack(text: str | None, *, preserve_edges: bool = False) -> str: + """Strip known plumbing leaks from ``text``. Idempotent; empty input → ``""``.""" + if not text: + return "" + sanitized = _replace_matching_json_objects( + text, _is_k8s_status, "[k8s status omitted]" + ) + sanitized = _replace_matching_json_objects( + sanitized, _is_tool_error_envelope, "[tool error omitted]" + ) + sanitized = _THREAD_TRAILER_RE.sub("", sanitized) + sanitized = _EXECUTION_TRAILER_RE.sub("[execution id omitted]", sanitized) + sanitized = _CURL_EXIT_RE.sub(r"transport_error(\1)", sanitized) + sanitized = re.sub(r"[ \t]+\n", "\n", sanitized) + sanitized = re.sub(r"\n{3,}", "\n\n", sanitized) + return sanitized if preserve_edges else sanitized.strip() + + +def sanitize_slack_event(value: Any) -> Any: + if isinstance(value, str): + return sanitize_for_slack(value, preserve_edges=True) + if isinstance(value, list): + return [sanitize_slack_event(item) for item in value] + if isinstance(value, dict): + sanitized: dict[str, Any] = {} + for key, item in value.items(): + if isinstance(item, (dict, list)) or key in _TEXT_KEYS: + sanitized[key] = sanitize_slack_event(item) + else: + sanitized[key] = item + return sanitized + return value + + +# ── Slackbot HTTP client ────────────────────────────────────────────── + +_RETRYABLE_STATUS = frozenset({408, 429, 500, 502, 503, 504}) +_RETRY_ATTEMPTS = 3 +_RETRY_BASE_DELAY_S = 0.25 + + +def _base_url() -> str: + return os.getenv("SLACKBOT_URL", "").strip().rstrip("/") + + +def _api_key() -> str: + return os.getenv("SLACKBOT_API_KEY", "").strip() + + +def enabled() -> bool: + return bool(_base_url() and _api_key()) + + +async def slackbot_post( + path: str, + body: dict[str, Any], + *, + timeout: httpx.Timeout | None = None, +) -> dict[str, Any] | None: + base_url = _base_url() + api_key = _api_key() + if not base_url or not api_key: + return None + request_timeout = timeout or httpx.Timeout(8.0, connect=2.0) + last_status: int | None = None + last_response: str | None = None + last_error: str | None = None + for attempt in range(_RETRY_ATTEMPTS): + try: + async with httpx.AsyncClient(timeout=request_timeout) as client: + response = await client.post( + f"{base_url}{path}", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=body, + ) + text = response.text + if response.is_success: + if not text: + return {} + data = response.json() + return data if isinstance(data, dict) else {} + last_status = response.status_code + last_response = text[:500] + if response.status_code not in _RETRYABLE_STATUS: + log.warning( + "slackbot_call_failed", + path=path, + status=response.status_code, + response=last_response, + ) + return None + except Exception as exc: + last_error = str(exc) + if attempt + 1 < _RETRY_ATTEMPTS: + await asyncio.sleep(_RETRY_BASE_DELAY_S * (2**attempt)) + log.warning( + "slackbot_call_failed", + path=path, + status=last_status, + response=last_response, + error=last_error, + attempts=_RETRY_ATTEMPTS, + ) + return None + + +# ── Delivery field extraction (Slack-shaped) ────────────────────────── + + +def is_slack_delivery(delivery: dict[str, Any] | None) -> bool: + return isinstance(delivery, dict) and str(delivery.get("platform") or "") == "slack" + + +def channel_id(delivery: dict[str, Any]) -> str: + return str(delivery.get("channel") or delivery.get("channel_id") or "").strip() + + +def thread_ts(delivery: dict[str, Any]) -> str: + return str(delivery.get("thread_ts") or "").strip() + + +def recipient_team_id(delivery: dict[str, Any], thread_key: str) -> str: + value = str( + delivery.get("recipient_team_id") + or delivery.get("team_id") + or delivery.get("team") + or "" + ).strip() + if value: + return value + parts = thread_key.split(":") + return parts[1] if len(parts) >= 2 and parts[0] == "slack" else "" + + +def recipient_user_id(delivery: dict[str, Any], metadata: dict[str, Any]) -> str: + return str( + delivery.get("recipient_user_id") + or delivery.get("user_id") + or metadata.get("user_id") + or "" + ).strip() + + +def slack_thread_key_to_channel(thread_key: str) -> str: + """Extract the channel ID from a ``slack:CHANNEL:TS`` thread_key. + + Returns ``""`` if the thread_key does not have the slack shape. + """ + parts = thread_key.split(":") + if len(parts) >= 3 and parts[0] == "slack": + # slack:TEAM:CHANNEL:TS (4-part) or slack:CHANNEL:TS (3-part) + return parts[2] if len(parts) >= 4 else parts[1] + return "" + + +# ── GitHub identity extraction from Slack profiles ──────────────────── + +_GITHUB_HANDLE_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?$") +_GITHUB_URL_RE = re.compile( + r"(?:https?://)?github\.com/([A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?)", + re.IGNORECASE, +) +_GITHUB_LABEL_RE = re.compile(r"\bgithub\b", re.IGNORECASE) +_GITHUB_PREFIX_RE = re.compile( + r"\bgithub\b\s*(?:username|user|handle|profile)?\s*[:/@-]?\s*@?([A-Za-z0-9][A-Za-z0-9-]{0,38})", + re.IGNORECASE, +) + + +def _valid_github_handle(value: str) -> str | None: + candidate = value.strip().strip("@").strip() + candidate = candidate.rstrip("/").split("/", 1)[0] + return candidate if _GITHUB_HANDLE_RE.match(candidate) else None + + +def extract_github_handle_from_slack_profile( + profile: dict[str, Any], +) -> tuple[str | None, str | None, str]: + """Return ``(handle, source, unavailable_reason)`` from Slack profile fields.""" + custom_fields = profile.get("custom_fields") + if not isinstance(custom_fields, dict) or not custom_fields: + return None, None, "no GitHub custom field found on Slack profile" + + saw_github_field = False + for label, raw_value in custom_fields.items(): + label_text = str(label or "").strip() + value = str(raw_value or "").strip() + if not value: + continue + + label_mentions_github = bool(_GITHUB_LABEL_RE.search(label_text)) + value_mentions_github = bool(_GITHUB_LABEL_RE.search(value)) + if not label_mentions_github and not value_mentions_github: + continue + saw_github_field = True + + source = ( + f'Slack profile custom field "{label_text}"' + if label_text + else "Slack profile custom field" + ) + url_match = _GITHUB_URL_RE.search(value) + if url_match: + handle = _valid_github_handle(url_match.group(1)) + if handle: + return f"@{handle}", source, "" + + prefixed_match = _GITHUB_PREFIX_RE.search(value) + if prefixed_match: + handle = _valid_github_handle(prefixed_match.group(1)) + if handle: + return f"@{handle}", source, "" + + if label_mentions_github: + handle = _valid_github_handle(value) + if handle: + return f"@{handle}", source, "" + + if saw_github_field: + return ( + None, + None, + "GitHub profile field did not contain a valid GitHub handle", + ) + return None, None, "no GitHub custom field found on Slack profile" + + +# ── Mention prefix used by recovery-command normalization ───────────── + +_SLACK_ID_MENTION_RE = re.compile( + r"^<@[WU][A-Z0-9]+>\s*[:,;-]?\s*(.*)$", re.IGNORECASE +) + + +# ── Live tool-call capture predicate ────────────────────────────────── + +_SLACK_CHANNEL_ID_RE = re.compile(r"^[CDG][A-Z0-9]+$") + + +# ── Platform implementation ─────────────────────────────────────────── + + +class SlackPlatform(MessagingPlatform): + name = "slack" + mention_prefix_re = _SLACK_ID_MENTION_RE + + # ── Scrubbing ────────────────────────────────────────────────── + + def sanitize_text( + self, text: str | None, *, preserve_edges: bool = False + ) -> str: + return sanitize_for_slack(text, preserve_edges=preserve_edges) + + def sanitize_event(self, value: Any) -> Any: + return sanitize_slack_event(value) + + # ── Thread-key parsing ───────────────────────────────────────── + + def thread_key_destination(self, thread_key: str) -> str | None: + result = slack_thread_key_to_channel(thread_key) + return result or None + + # ── Identity recovery ────────────────────────────────────────── + + async def load_requester_identity( + self, user_id: str | None + ) -> dict[str, str | bool] | None: + if not user_id: + return None + identity: dict[str, str | bool] = { + "slack_user_id": user_id, + "slack_mention": f"<@{user_id}>", + } + try: + from api.app import get_tool_manager + + profile = await get_tool_manager().call_tool_raw( + "slack", "get_user_profile", {"user_id": user_id} + ) + except Exception as exc: + log.warning( + "requester_identity_lookup_failed", + platform=self.name, + user_id=user_id, + error=str(exc), + ) + identity.update( + { + "github_handle_verified": False, + "github_handle_unavailable_reason": "Slack profile could not be fetched", + } + ) + return identity + + if not isinstance(profile, dict) or profile.get("error"): + error = str(profile.get("error") or "Slack profile could not be fetched") + log.warning( + "requester_identity_lookup_failed", + platform=self.name, + user_id=user_id, + error=error, + ) + identity.update( + { + "github_handle_verified": False, + "github_handle_unavailable_reason": "Slack profile could not be fetched", + } + ) + return identity + + handle, source, reason = extract_github_handle_from_slack_profile(profile) + if handle: + identity.update( + { + "github_handle": handle, + "github_handle_source": source or "Slack profile custom field", + "github_handle_verified": True, + } + ) + else: + identity.update( + { + "github_handle_verified": False, + "github_handle_unavailable_reason": reason, + } + ) + return identity + + def system_prompt_identity_lines( + self, identity: dict[str, str | bool] | None + ) -> list[str]: + if not identity: + return [] + lines = [ + "", + "## Requester Identity", + "", + f"- Slack user ID: {identity['slack_user_id']}", + f"- Slack mention: {identity['slack_mention']}", + ] + if identity.get("github_handle_verified"): + lines.extend( + [ + f"- GitHub handle from Slack profile: {identity['github_handle']}", + f"- GitHub handle source: {identity['github_handle_source']}", + "- GitHub handle verified: yes", + ] + ) + else: + lines.extend( + [ + "- GitHub handle from Slack profile: unavailable", + "- GitHub handle unavailable reason: " + f"{identity['github_handle_unavailable_reason']}", + "- GitHub handle verified: no", + ] + ) + return lines + + def system_prompt_rules(self, *, user_id: str | None = None) -> list[str]: + lines = [ + "", + "## Slack Formatting Rules", + "", + "- Use standard markdown links `[Display Text](URL)` for hyperlinks", + "- Do NOT use Slack-native `` link syntax", + "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", + "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", + "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", + "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", + "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", + ] + if user_id: + lines.append( + f"- After completing a long task, tag the requester with their real Slack mention: <@{user_id}>" + ) + return lines + + # ── Live-streaming session lifecycle ─────────────────────────── + + async def open_live_session( + self, + *, + delivery: dict[str, Any], + metadata: dict[str, Any], + thread_key: str, + title: str = "Centaur execution", + header: str | None = None, + ) -> str | None: + if not enabled() or not is_slack_delivery(delivery): + return None + channel = channel_id(delivery) + parent_ts = thread_ts(delivery) + if not channel or not parent_ts: + return None + body: dict[str, Any] = { + "channel": channel, + "parent_ts": parent_ts, + "recipient_team_id": recipient_team_id(delivery, thread_key), + "recipient_user_id": recipient_user_id(delivery, metadata), + "title": title, + } + header_text = (header or "").strip() + if header_text: + body["header"] = header_text + result = await slackbot_post("/api/slack/agent-sessions", body) + session_id = str((result or {}).get("session_id") or "").strip() + return session_id or None + + async def session_text( + self, session_id: str | None, markdown: str + ) -> None: + sanitized = sanitize_for_slack(markdown) + if not session_id or not sanitized.strip(): + return + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/text", + {"markdown": sanitized}, + ) + + async def session_step( + self, + session_id: str | None, + *, + step_id: str, + title: str, + status: str = "in_progress", + details: str | None = None, + output: str | None = None, + ) -> None: + if not session_id or not step_id or not title: + return + body: dict[str, Any] = { + "id": step_id, + "title": sanitize_for_slack(title), + "status": status, + } + if details: + body["details"] = sanitize_for_slack(details) + if output: + body["output"] = sanitize_for_slack(output) + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/step", body + ) + + async def session_done( + self, session_id: str | None, thread_id: str | None = None + ) -> None: + if not session_id: + return + body: dict[str, Any] = {} + if thread_id: + body["thread_id"] = thread_id + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/done", body + ) + + async def harness_event( + self, session_id: str | None, event: dict[str, Any] + ) -> dict[str, Any] | None: + if not session_id: + return None + return await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/harness-event", + {"event": sanitize_slack_event(event)}, + timeout=httpx.Timeout(60.0, connect=2.0), + ) + + async def assistant_status( + self, delivery: dict[str, Any], status: str + ) -> None: + if not enabled() or not is_slack_delivery(delivery): + return + channel = channel_id(delivery) + ts = thread_ts(delivery) + if not channel or not ts: + return + await slackbot_post( + "/api/slack/assistant/status", + {"channel_id": channel, "thread_ts": ts, "status": status}, + ) + + # ── Outbound channel post ────────────────────────────────────── + + async def send_channel_message( + self, + channel: str, + text: str, + *, + thread_id: str | None = None, + ) -> dict[str, Any]: + from api.app import get_tool_manager + + args: dict[str, Any] = { + "channel": channel, + "text": text, + "no_attribution": True, + } + if thread_id: + args["thread_ts"] = thread_id + raw = await get_tool_manager().call_tool("slack", "send_message", args) + try: + result = json.loads(raw) if isinstance(raw, str) else raw + except (ValueError, TypeError): + result = {"raw": raw} + if isinstance(result, dict) and result.get("error"): + raise RuntimeError(str(result["error"])) + return result + + # ── Live tool-call capture ───────────────────────────────────── + + async def capture_active_thread_tool_call( + self, + *, + request: Any, + sandbox_claims: dict[str, Any] | None, + tool_name: str, + method_name: str, + args: dict[str, Any], + ) -> dict[str, Any] | None: + if request is None or not sandbox_claims: + return None + if tool_name != "slack" or method_name != "send_message": + return None + + thread_key = str(sandbox_claims.get("thread_key") or "") + parts = thread_key.split(":") + if len(parts) < 4 or parts[0] != "slack": + return None + active_channel = parts[2] + active_thread_ts = parts[3] + requested_channel = str( + args.get("channel") or args.get("channel_id") or "" + ).lstrip("#") + requested_thread_ts = str(args.get("thread_ts") or "") + channel_is_id = bool(_SLACK_CHANNEL_ID_RE.match(requested_channel)) + if channel_is_id and requested_channel != active_channel: + return None + if requested_thread_ts and requested_thread_ts != active_thread_ts: + return None + + text = str(args.get("text") or args.get("message") or "").strip() + if not text: + return None + + pool = getattr(request.app.state, "db_pool", None) + if pool is None: + return None + session_id = await pool.fetchval( + "SELECT metadata->>'slackbot_agent_session_id' " + "FROM agent_execution_requests " + "WHERE thread_key = $1 " + "AND status = 'running' " + "AND (" + " metadata->>'slackbot_live_delivery' = 'true' " + " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" + ") " + "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " + "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", + thread_key, + ) + session_id = str(session_id or "").strip() + if not session_id: + return None + + await self.session_text(session_id, text) + log.info( + "slack_send_message_captured", + thread_key=thread_key, + sandbox_container_id=sandbox_claims.get("container_id"), + slackbot_agent_session_id=session_id, + ) + return { + "captured": True, + "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", + "channel": active_channel, + "thread_ts": active_thread_ts, + } + + +# ── Singleton + registration ────────────────────────────────────────── + +SLACK_PLATFORM = SlackPlatform() +register_platform(SLACK_PLATFORM) diff --git a/services/api/api/routers/agent.py b/services/api/api/routers/agent.py index 0db0b1b5..f44f48ab 100644 --- a/services/api/api/routers/agent.py +++ b/services/api/api/routers/agent.py @@ -55,6 +55,39 @@ FINAL_DELIVERY_MAX_ATTEMPTS = int(os.getenv("FINAL_DELIVERY_MAX_ATTEMPTS", "50")) + +def _resolve_delivery_fallback( + body_delivery: dict[str, Any] | None, + body_platform: str | None, + body_user_id: str | None, + *, + endpoint: str, + default_platform: str, + default_channel: str, +) -> dict[str, Any]: + """Resolve the delivery dict for an execute/auto-execute call. + + Explicit ``body.delivery`` wins. Otherwise we build a minimal delivery + from ``body.platform`` (or the endpoint-specific default) and log a + deprecation note when the caller relied on the fallback so we can flush + the implicit defaults in a later release. + """ + if isinstance(body_delivery, dict) and body_delivery: + return body_delivery + platform = (body_platform or "").strip() or default_platform + if not body_platform: + log.info( + "execute_delivery_fallback", + endpoint=endpoint, + platform=platform, + reason="caller omitted both delivery and platform", + ) + return { + "channel": default_channel, + "platform": platform, + "recipient_user_id": body_user_id, + } + router = APIRouter( prefix="/agent", tags=["agent"], @@ -270,11 +303,14 @@ async def execute(request: Request): return _json_error(exc.code, exc.message, exc.status_code) execute_id = body.execute_id or f"exec-{uuid.uuid4().hex[:16]}" - delivery = body.delivery or { - "channel": "slack", - "platform": body.platform or "slack", - "recipient_user_id": body.user_id, - } + delivery = _resolve_delivery_fallback( + body.delivery, + body.platform, + body.user_id, + endpoint="execute", + default_platform="slack", + default_channel="slack", + ) metadata = body.metadata or {} if body.user_id: metadata = {**metadata, "user_id": body.user_id} @@ -339,11 +375,14 @@ async def _auto_execute(pool, body: ExecuteRequest) -> JSONResponse: # 3. Execute execute_id = body.execute_id or f"exec-{nonce}" - delivery = body.delivery or { - "channel": "dev", - "platform": body.platform or "dev", - "recipient_user_id": body.user_id, - } + delivery = _resolve_delivery_fallback( + body.delivery, + body.platform, + body.user_id, + endpoint="execute_auto", + default_platform="dev", + default_channel="dev", + ) result = await enqueue_execution( pool, diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index bb0306bc..8a0695c9 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -23,7 +23,7 @@ steer_stdin, stop_session, ) -from api import slackbot_client +from api.platforms import resolve_for_delivery from api.observability import ( ExecutionObservationAccumulator, extract_usage_metrics, @@ -975,25 +975,6 @@ async def _mark_slackbot_live_delivery_failed( ) -def _canonical_text_blocks(event: dict[str, Any]) -> list[str]: - if event.get("type") == "assistant": - message = event.get("message") if isinstance(event.get("message"), dict) else {} - content = ( - message.get("content") if isinstance(message.get("content"), list) else [] - ) - return [ - str(block.get("text") or "") - for block in content - if isinstance(block, dict) - and block.get("type") == "text" - and str(block.get("text") or "").strip() - ] - if event.get("type") == "result": - text = str(event.get("result") or event.get("text") or "") - return [text] if text.strip() else [] - return [] - - def _slackbot_streamed_answer_chars(value: Any) -> int: if isinstance(value, bool): return 0 @@ -1002,72 +983,6 @@ def _slackbot_streamed_answer_chars(value: Any) -> int: return 0 -async def _send_slackbot_canonical_event( - session_id: str, event: dict[str, Any] -) -> bool: - sent_text = False - for text in _canonical_text_blocks(event): - await slackbot_client.session_text( - session_id, - _clip_slackbot(text, _MAX_SLACKBOT_TEXT_CHARS), - ) - sent_text = True - - event_type = str(event.get("type") or "") - if event_type == "assistant": - message = event.get("message") if isinstance(event.get("message"), dict) else {} - content = ( - message.get("content") if isinstance(message.get("content"), list) else [] - ) - for block in content: - if not isinstance(block, dict) or block.get("type") != "tool_use": - continue - tool_id = str(block.get("id") or uuid.uuid4()) - tool_name = str(block.get("name") or "Tool") - tool_input = ( - block.get("input") if isinstance(block.get("input"), dict) else {} - ) - await slackbot_client.session_step( - session_id, - step_id=tool_id, - title=tool_name, - status="in_progress", - details=_clip_slackbot(tool_input), - ) - elif event_type == "tool": - content = event.get("content") if isinstance(event.get("content"), list) else [] - for block in content: - if not isinstance(block, dict): - continue - tool_id = str(block.get("tool_use_id") or uuid.uuid4()) - is_error = bool(block.get("is_error")) - await slackbot_client.session_step( - session_id, - step_id=tool_id, - title="Tool result", - status="error" if is_error else "complete", - output=_clip_slackbot(block.get("content")), - ) - elif event_type == "command_execution": - command = str(event.get("command") or "Command") - await slackbot_client.session_step( - session_id, - step_id=f"command-{hashlib.sha256(command.encode()).hexdigest()[:12]}", - title=command[:256], - status="complete" if event.get("status") == "completed" else "in_progress", - output=_clip_slackbot(event.get("aggregated_output") or ""), - ) - elif event_type == "error": - await slackbot_client.session_step( - session_id, - step_id=f"error-{uuid.uuid4().hex[:12]}", - title="Execution error", - status="error", - output=_clip_slackbot(event.get("error") or event), - ) - return sent_text - - async def append_execution_event( pool, *, @@ -2331,6 +2246,7 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: assignment_generation = int(row["assignment_generation"]) execution_status = str(row.get("status") or "running") delivery = decode_jsonb(row.get("delivery"), {}) + platform = resolve_for_delivery(delivery) execution_metadata = decode_jsonb(row.get("metadata"), {}) if not isinstance(execution_metadata, dict): execution_metadata = {} @@ -2594,7 +2510,7 @@ async def _finalize_execution( if finalize_session_id and not slackbot_done and slackbot_forward_live: try: terminal_result_sent_to_slackbot = False - await slackbot_client.session_done( + await platform.session_done( finalize_session_id, harness_thread_id or None ) slackbot_done = True @@ -2760,7 +2676,7 @@ async def _finalize_execution( slack_event.setdefault( "centaur_assignment_generation", assignment_generation ) - harness_result = await slackbot_client.harness_event( + harness_result = await platform.harness_event( slackbot_session_id, slack_event ) if harness_result is None: @@ -2789,7 +2705,7 @@ async def _finalize_execution( streamed_answer_chars=slackbot_streamed_answer_chars, ) with contextlib.suppress(Exception): - await slackbot_client.set_status(delivery, "") + await platform.assistant_status(delivery, "") slackbot_forward_live = False break if isinstance(harness_result, dict): diff --git a/services/api/api/slack_sanitize.py b/services/api/api/slack_sanitize.py index c37f0689..4fdf3864 100644 --- a/services/api/api/slack_sanitize.py +++ b/services/api/api/slack_sanitize.py @@ -1,76 +1,9 @@ -"""Strip known plumbing-leak shapes from assistant text bound for Slack.""" +"""Compatibility shim. The implementation moved to ``api.platforms.slack``; +this module remains so existing imports of ``sanitize_for_slack`` keep +working.""" from __future__ import annotations -import json -import re -from collections.abc import Callable -from typing import Any +from api.platforms.slack import sanitize_for_slack # noqa: F401 -_THREAD_TRAILER_RE = re.compile( - r"(?:^|\s)(?:Agent|Codex|Amp|Claude\s+Code|Pi)\s+thread\s+`?[0-9a-f-]{8,}`?(?:,\s*with\s+interactive\s+elements)?(?=\s*$|[.!?]\s*$)", - re.IGNORECASE | re.MULTILINE, -) -_EXECUTION_TRAILER_RE = re.compile( - r"\b(?:Execution|execution_id)\s*[:=]\s*`?exe_[0-9a-f]{16}`?", - re.IGNORECASE, -) -_CURL_EXIT_RE = re.compile(r"curl:?\s*\((\d+)\):?\s*[^\n]{0,200}", re.IGNORECASE) - - -def _replace_matching_json_objects( - text: str, - predicate: Callable[[dict[str, Any]], bool], - replacement: str, -) -> str: - decoder = json.JSONDecoder() - out: list[str] = [] - index = 0 - while index < len(text): - if text[index] != "{": - out.append(text[index]) - index += 1 - continue - try: - value, end = decoder.raw_decode(text[index:]) - except ValueError: - out.append(text[index]) - index += 1 - continue - if isinstance(value, dict) and predicate(value): - out.append(replacement) - index += end - continue - out.append(text[index]) - index += 1 - return "".join(out) - - -def _is_k8s_status(value: dict[str, Any]) -> bool: - return value.get("kind") == "Status" and any( - key in value for key in ("status", "reason", "code", "message") - ) - - -def _is_tool_error_envelope(value: dict[str, Any]) -> bool: - return "error_type" in value and any( - key in value for key in ("detail", "error", "status_code") - ) - - -def sanitize_for_slack(text: str | None, *, preserve_edges: bool = False) -> str: - """Strip known plumbing leaks from `text`. Idempotent; empty input -> "".""" - if not text: - return "" - sanitized = _replace_matching_json_objects( - text, _is_k8s_status, "[k8s status omitted]" - ) - sanitized = _replace_matching_json_objects( - sanitized, _is_tool_error_envelope, "[tool error omitted]" - ) - sanitized = _THREAD_TRAILER_RE.sub("", sanitized) - sanitized = _EXECUTION_TRAILER_RE.sub("[execution id omitted]", sanitized) - sanitized = _CURL_EXIT_RE.sub(r"transport_error(\1)", sanitized) - sanitized = re.sub(r"[ \t]+\n", "\n", sanitized) - sanitized = re.sub(r"\n{3,}", "\n\n", sanitized) - return sanitized if preserve_edges else sanitized.strip() +__all__ = ["sanitize_for_slack"] diff --git a/services/api/api/slackbot_client.py b/services/api/api/slackbot_client.py index dc4feca4..e53f7da8 100644 --- a/services/api/api/slackbot_client.py +++ b/services/api/api/slackbot_client.py @@ -1,122 +1,45 @@ -from __future__ import annotations - -import asyncio -import os -from typing import Any - -import httpx -import structlog - -from api.slack_sanitize import sanitize_for_slack - -log = structlog.get_logger() - -# Other 4xx is permanent: the slackbot is telling us the call is malformed. -_RETRYABLE_STATUS = frozenset({408, 429, 500, 502, 503, 504}) -_RETRY_ATTEMPTS = 3 -_RETRY_BASE_DELAY_S = 0.25 - - -def _base_url() -> str: - return os.getenv("SLACKBOT_URL", "").strip().rstrip("/") - - -def _api_key() -> str: - return os.getenv("SLACKBOT_API_KEY", "").strip() - - -def enabled() -> bool: - return bool(_base_url() and _api_key()) - - -async def post( - path: str, - body: dict[str, Any], - *, - timeout: httpx.Timeout | None = None, -) -> dict[str, Any] | None: - base_url = _base_url() - api_key = _api_key() - if not base_url or not api_key: - return None - request_timeout = timeout or httpx.Timeout(8.0, connect=2.0) - last_status: int | None = None - last_response: str | None = None - last_error: str | None = None - for attempt in range(_RETRY_ATTEMPTS): - try: - async with httpx.AsyncClient(timeout=request_timeout) as client: - response = await client.post( - f"{base_url}{path}", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - }, - json=body, - ) - text = response.text - if response.is_success: - if not text: - return {} - data = response.json() - return data if isinstance(data, dict) else {} - last_status = response.status_code - last_response = text[:500] - if response.status_code not in _RETRYABLE_STATUS: - log.warning( - "slackbot_call_failed", - path=path, - status=response.status_code, - response=last_response, - ) - return None - except Exception as exc: - last_error = str(exc) - if attempt + 1 < _RETRY_ATTEMPTS: - await asyncio.sleep(_RETRY_BASE_DELAY_S * (2**attempt)) - log.warning( - "slackbot_call_failed", - path=path, - status=last_status, - response=last_response, - error=last_error, - attempts=_RETRY_ATTEMPTS, - ) - return None +"""Compatibility shim. The implementation moved to ``api.platforms.slack``; +this module re-exports the module-level helpers and delegates the async +session helpers to the registered ``SlackPlatform`` singleton so existing +``from api import slackbot_client`` callers keep working. +Phase 1 callers should resolve a platform via +``api.platforms.resolve_for_delivery(delivery)`` and call methods on it +directly. This shim will be removed once those migrations land. +""" -def is_slack_delivery(delivery: dict[str, Any] | None) -> bool: - return isinstance(delivery, dict) and str(delivery.get("platform") or "") == "slack" - - -def channel_id(delivery: dict[str, Any]) -> str: - return str(delivery.get("channel") or delivery.get("channel_id") or "").strip() - - -def thread_ts(delivery: dict[str, Any]) -> str: - return str(delivery.get("thread_ts") or "").strip() - - -def recipient_team_id(delivery: dict[str, Any], thread_key: str) -> str: - value = str( - delivery.get("recipient_team_id") - or delivery.get("team_id") - or delivery.get("team") - or "" - ).strip() - if value: - return value - parts = thread_key.split(":") - return parts[1] if len(parts) >= 2 and parts[0] == "slack" else "" +from __future__ import annotations +from typing import Any -def recipient_user_id(delivery: dict[str, Any], metadata: dict[str, Any]) -> str: - return str( - delivery.get("recipient_user_id") - or delivery.get("user_id") - or metadata.get("user_id") - or "" - ).strip() +from api.platforms.slack import ( + SLACK_PLATFORM, + channel_id, + enabled, + is_slack_delivery, + recipient_team_id, + recipient_user_id, + sanitize_slack_event, + slackbot_post as post, + thread_ts, +) + +__all__ = [ + "channel_id", + "enabled", + "harness_event", + "is_slack_delivery", + "open_agent_session", + "post", + "recipient_team_id", + "recipient_user_id", + "sanitize_slack_event", + "session_done", + "session_step", + "session_text", + "set_status", + "thread_ts", +] async def open_agent_session( @@ -127,32 +50,17 @@ async def open_agent_session( title: str = "Centaur execution", header: str | None = None, ) -> str | None: - if not enabled() or not is_slack_delivery(delivery): - return None - channel = channel_id(delivery) - parent_ts = thread_ts(delivery) - if not channel or not parent_ts: - return None - body: dict[str, Any] = { - "channel": channel, - "parent_ts": parent_ts, - "recipient_team_id": recipient_team_id(delivery, thread_key), - "recipient_user_id": recipient_user_id(delivery, metadata), - "title": title, - } - header_text = (header or "").strip() - if header_text: - body["header"] = header_text - result = await post("/api/slack/agent-sessions", body) - session_id = str((result or {}).get("session_id") or "").strip() - return session_id or None + return await SLACK_PLATFORM.open_live_session( + delivery=delivery, + metadata=metadata, + thread_key=thread_key, + title=title, + header=header, + ) async def session_text(session_id: str | None, markdown: str) -> None: - sanitized = sanitize_for_slack(markdown) - if not session_id or not sanitized.strip(): - return - await post(f"/api/slack/agent-sessions/{session_id}/text", {"markdown": sanitized}) + return await SLACK_PLATFORM.session_text(session_id, markdown) async def session_step( @@ -164,79 +72,27 @@ async def session_step( details: str | None = None, output: str | None = None, ) -> None: - if not session_id or not step_id or not title: - return - body: dict[str, Any] = { - "id": step_id, - "title": sanitize_for_slack(title), - "status": status, - } - if details: - body["details"] = sanitize_for_slack(details) - if output: - body["output"] = sanitize_for_slack(output) - await post(f"/api/slack/agent-sessions/{session_id}/step", body) - - -async def session_done(session_id: str | None, thread_id: str | None = None) -> None: - if not session_id: - return - body: dict[str, Any] = {} - if thread_id: - body["thread_id"] = thread_id - await post(f"/api/slack/agent-sessions/{session_id}/done", body) + return await SLACK_PLATFORM.session_step( + session_id, + step_id=step_id, + title=title, + status=status, + details=details, + output=output, + ) + + +async def session_done( + session_id: str | None, thread_id: str | None = None +) -> None: + return await SLACK_PLATFORM.session_done(session_id, thread_id) async def harness_event( session_id: str | None, event: dict[str, Any] ) -> dict[str, Any] | None: - if not session_id: - return None - return await post( - f"/api/slack/agent-sessions/{session_id}/harness-event", - {"event": sanitize_slack_event(event)}, - timeout=httpx.Timeout(60.0, connect=2.0), - ) + return await SLACK_PLATFORM.harness_event(session_id, event) async def set_status(delivery: dict[str, Any], status: str) -> None: - if not enabled() or not is_slack_delivery(delivery): - return - channel = channel_id(delivery) - ts = thread_ts(delivery) - if not channel or not ts: - return - await post( - "/api/slack/assistant/status", - {"channel_id": channel, "thread_ts": ts, "status": status}, - ) - - -_TEXT_KEYS = { - "content", - "delta", - "details", - "error", - "message", - "output", - "result", - "summary", - "text", - "title", -} - - -def sanitize_slack_event(value: Any) -> Any: - if isinstance(value, str): - return sanitize_for_slack(value, preserve_edges=True) - if isinstance(value, list): - return [sanitize_slack_event(item) for item in value] - if isinstance(value, dict): - sanitized: dict[str, Any] = {} - for key, item in value.items(): - if isinstance(item, (dict, list)) or key in _TEXT_KEYS: - sanitized[key] = sanitize_slack_event(item) - else: - sanitized[key] = item - return sanitized - return value + return await SLACK_PLATFORM.assistant_status(delivery, status) diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index 58e73f08..a32a71e4 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -31,7 +31,7 @@ from api.laminar_tracing import set_span_attributes, start_span from api.vm_metrics import record_tool_call from api.deps import get_key_info, get_sandbox_claims, verify_api_key -from api import slackbot_client +from api.platforms import resolve_for_thread_key from centaur_sdk import ToolContext, reset_tool_context, set_tool_context log = structlog.get_logger() @@ -908,7 +908,7 @@ def _timeout_label(timeout_s: float | None) -> str: return "no timeout" if timeout_s is None else f"{timeout_s:g}s" -async def _capture_live_slack_send( +async def _capture_active_thread_tool_call( *, request: Request | None, sandbox_claims: dict[str, Any] | None, @@ -916,62 +916,26 @@ async def _capture_live_slack_send( method_name: str, args: dict[str, Any], ) -> dict[str, Any] | None: + """Give the active thread's messaging platform a chance to intercept this + tool call (e.g. an agent calling ``slack.send_message`` to the active + thread is re-routed through the live streaming session). + + Returns the captured-result envelope or ``None`` to fall through to the + normal tool path. + """ if request is None or not sandbox_claims: return None - if tool_name != "slack" or method_name != "send_message": - return None - thread_key = str(sandbox_claims.get("thread_key") or "") - parts = thread_key.split(":") - if len(parts) < 4 or parts[0] != "slack": - return None - active_channel = parts[2] - active_thread_ts = parts[3] - requested_channel = str(args.get("channel") or args.get("channel_id") or "").lstrip("#") - requested_thread_ts = str(args.get("thread_ts") or "") - channel_is_id = bool(re.match(r"^[CDG][A-Z0-9]+$", requested_channel)) - if channel_is_id and requested_channel != active_channel: - return None - if requested_thread_ts and requested_thread_ts != active_thread_ts: - return None - - text = str(args.get("text") or args.get("message") or "").strip() - if not text: + platform = resolve_for_thread_key(thread_key) + if platform is None: return None - - pool = getattr(request.app.state, "db_pool", None) - if pool is None: - return None - session_id = await pool.fetchval( - "SELECT metadata->>'slackbot_agent_session_id' " - "FROM agent_execution_requests " - "WHERE thread_key = $1 " - "AND status = 'running' " - "AND (" - " metadata->>'slackbot_live_delivery' = 'true' " - " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" - ") " - "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " - "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", - thread_key, - ) - session_id = str(session_id or "").strip() - if not session_id: - return None - - await slackbot_client.session_text(session_id, text) - log.info( - "slack_send_message_captured", - thread_key=thread_key, - sandbox_container_id=sandbox_claims.get("container_id"), - slackbot_agent_session_id=session_id, + return await platform.capture_active_thread_tool_call( + request=request, + sandbox_claims=sandbox_claims, + tool_name=tool_name, + method_name=method_name, + args=args, ) - return { - "captured": True, - "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", - "channel": active_channel, - "thread_ts": active_thread_ts, - } async def _extract_tool_attachment( @@ -1855,24 +1819,24 @@ async def call_tool_raw( } t0 = time.monotonic() log.info("tool_call_started", **call_fields) - captured_slack_send = await _capture_live_slack_send( + captured_tool_call = await _capture_active_thread_tool_call( request=request, sandbox_claims=sandbox_claims, tool_name=tool_name, method_name=method_name, args=args, ) - if captured_slack_send is not None: + if captured_tool_call is not None: duration_ms = round((time.monotonic() - t0) * 1000) log.info( "tool_call_completed", duration_ms=duration_ms, success=True, - result_size_bytes=_payload_size_bytes(captured_slack_send), + result_size_bytes=_payload_size_bytes(captured_tool_call), captured=True, **call_fields, ) - return captured_slack_send + return captured_tool_call validation_error = _tool_arg_validation_error(method, args) if validation_error is not None: log.warning( @@ -2024,27 +1988,27 @@ async def call_tool( } t0 = time.monotonic() log.info("tool_call_started", **call_fields) - captured_slack_send = await _capture_live_slack_send( + captured_tool_call = await _capture_active_thread_tool_call( request=request, sandbox_claims=sandbox_claims, tool_name=tool_name, method_name=method_name, args=args, ) - if captured_slack_send is not None: + if captured_tool_call is not None: duration_ms = round((time.monotonic() - t0) * 1000) log.info( "tool_call_completed", duration_ms=duration_ms, success=True, - result_size_bytes=_payload_size_bytes(captured_slack_send), + result_size_bytes=_payload_size_bytes(captured_tool_call), captured=True, **call_fields, ) record_tool_call(tool_name, method_name, True, duration_ms / 1000) if format == "toon": - return _to_toon(captured_slack_send) - return _normalize_for_serialization(captured_slack_send) + return _to_toon(captured_tool_call) + return _normalize_for_serialization(captured_tool_call) validation_error = _tool_arg_validation_error(method, args) if validation_error is not None: log.warning( diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index dc04cc38..b61955ba 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -32,7 +32,7 @@ import structlog -from api import slackbot_client +from api.platforms import resolve_for_delivery from api.runtime_control import ( ControlPlaneError, append_message, @@ -75,22 +75,36 @@ class Delivery: team_id: str | None = None @classmethod - def slack( + def for_channel( cls, + platform: str, channel: str, - thread_ts: str, + thread_id: str | None = None, *, user_id: str | None = None, team_id: str | None = None, ) -> Delivery: return cls( - platform="slack", + platform=platform, channel=channel, - thread_ts=thread_ts, + thread_ts=thread_id, recipient_user_id=user_id, recipient_team_id=team_id, ) + @classmethod + def slack( + cls, + channel: str, + thread_ts: str, + *, + user_id: str | None = None, + team_id: str | None = None, + ) -> Delivery: + return cls.for_channel( + "slack", channel, thread_ts, user_id=user_id, team_id=team_id, + ) + @classmethod def dev(cls) -> Delivery: return cls(platform="dev") @@ -778,42 +792,45 @@ async def run_agent( eager_start=eager_start, ) - async def post_to_slack( + async def post_to_channel( self, + platform: str, channel: str, text: str, *, - thread_ts: str | None = None, + thread_id: str | None = None, ) -> dict[str, Any]: - """Post a message to a Slack channel via the slack tool. + """Post a message to a channel via the platform's outbound tool. - Accepts channel name (e.g. ``"team-updates"``) or ID. - Uses a checkpointed step so the message is sent exactly once, - even if the workflow replays. + Uses a checkpointed step so the message is sent exactly once even + if the workflow replays. ``platform`` selects the registered + messaging platform (``"slack"``, ``"discord"``, …). """ - from api.app import get_tool_manager + from api.platforms import resolve_platform + + platform_impl = resolve_platform(platform) async def _post() -> dict[str, Any]: - tm = get_tool_manager() - args: dict[str, Any] = { - "channel": channel, - "text": text, - "no_attribution": True, - } - if thread_ts: - args["thread_ts"] = thread_ts - raw = await tm.call_tool("slack", "send_message", args) - import json as _json - try: - result = _json.loads(raw) if isinstance(raw, str) else raw - except (ValueError, TypeError): - result = {"raw": raw} - if isinstance(result, dict) and result.get("error"): - raise RuntimeError(str(result["error"])) - return result + return await platform_impl.send_channel_message( + channel, text, thread_id=thread_id, + ) - step_name = f"post_slack_{channel}" - return await self.step(step_name, _post, step_kind="slack_post") + step_name = f"post_{platform_impl.name}_{channel}" + return await self.step( + step_name, _post, step_kind=f"{platform_impl.name}_post", + ) + + async def post_to_slack( + self, + channel: str, + text: str, + *, + thread_ts: str | None = None, + ) -> dict[str, Any]: + """Slack-specific convenience wrapper around ``post_to_channel``.""" + return await self.post_to_channel( + "slack", channel, text, thread_id=thread_ts, + ) @property def tools(self) -> _ToolProxy: @@ -1141,6 +1158,7 @@ async def _dispatch() -> dict[str, Any]: effective_delivery = dict(run_in.get("delivery") or {}) effective_history = history_messages or run_in.get("history_messages") or [] selector = {"persona_id": persona, "harness": harness} + platform = resolve_for_delivery(effective_delivery) slackbot_session_id: str | None = None try: @@ -1155,7 +1173,7 @@ async def _dispatch() -> dict[str, Any]: ) except Exception as exc: try: - failure_session_id = await slackbot_client.open_agent_session( + failure_session_id = await platform.open_live_session( delivery=effective_delivery, metadata=effective_metadata, thread_key=effective_thread_key, @@ -1163,11 +1181,11 @@ async def _dispatch() -> dict[str, Any]: header=None, ) if failure_session_id: - await slackbot_client.session_text( + await platform.session_text( failure_session_id, f"Failed to start the runtime: {exc}", ) - await slackbot_client.session_done(failure_session_id) + await platform.session_done(failure_session_id) except Exception: log.warning( "workflow_spawn_failure_session_failed", @@ -1184,7 +1202,7 @@ async def _dispatch() -> dict[str, Any]: session_header = await _compute_agent_session_header( ctx._pool, effective_thread_key, selector, ) - slackbot_session_id = await slackbot_client.open_agent_session( + slackbot_session_id = await platform.open_live_session( delivery=effective_delivery, metadata=effective_metadata, thread_key=effective_thread_key, @@ -1432,7 +1450,7 @@ def _load_workflow_file( return wf_handler = getattr(mod, "handler", None) - # Auto-generate handler from PROMPT + SLACK_CHANNEL exports + # Auto-generate handler from PROMPT + (SLACK_CHANNEL | DELIVERY) exports if not callable(wf_handler): prompt_val = getattr(mod, "PROMPT", None) if not isinstance(prompt_val, str) or not prompt_val.strip(): @@ -1443,16 +1461,48 @@ def _load_workflow_file( ) return channel_val = getattr(mod, "SLACK_CHANNEL", None) + delivery_val = getattr(mod, "DELIVERY", None) + if not isinstance(delivery_val, dict): + delivery_val = None async def _auto_handler( inp: Any, ctx: WorkflowContext, - _prompt: str = prompt_val, _channel: str | None = channel_val, + _prompt: str = prompt_val, + _channel: str | None = channel_val, + _delivery: dict[str, Any] | None = delivery_val, ) -> dict[str, Any]: result = await ctx.agent_turn(_prompt) text = result.get("result_text", "") - channel = (inp.get("slack_channel") if isinstance(inp, dict) else None) or _channel + inp_delivery = ( + inp.get("delivery") if isinstance(inp, dict) else None + ) or {} + inp_slack_channel = ( + inp.get("slack_channel") if isinstance(inp, dict) else None + ) + platform_name = ( + inp_delivery.get("platform") + or (_delivery.get("platform") if _delivery else None) + or "slack" + ) + channel = ( + inp_delivery.get("channel") + or (_delivery.get("channel") if _delivery else None) + or inp_slack_channel + or _channel + ) + thread_id = ( + inp_delivery.get("thread_ts") + or inp_delivery.get("thread_id") + or ( + _delivery.get("thread_ts") or _delivery.get("thread_id") + if _delivery + else None + ) + ) if text and channel: - await ctx.post_to_slack(channel, text) + await ctx.post_to_channel( + platform_name, channel, text, thread_id=thread_id, + ) return result wf_handler = _auto_handler @@ -1473,15 +1523,33 @@ async def _auto_handler( slack_ch = getattr(mod, "SLACK_CHANNEL", None) if isinstance(slack_ch, str) and slack_ch.strip(): schedule.setdefault("slack_channel", slack_ch.strip()) + mod_delivery = getattr(mod, "DELIVERY", None) + if ( + isinstance(mod_delivery, dict) + and mod_delivery.get("platform") + and mod_delivery.get("channel") + ): + schedule.setdefault("delivery", dict(mod_delivery)) version = hashlib.sha256(py_file.read_bytes()).hexdigest() - _WORKFLOW_HANDLERS[wf_name] = _RegisteredHandler( + registered = _RegisteredHandler( handler=wf_handler, input_cls=input_cls, source_path=str(py_file), version=version, schedule=schedule, ) + _WORKFLOW_HANDLERS[wf_name] = registered discovered[wf_name] = str(py_file) + # Register any aliases this workflow module declares (for renames + # where the old name must keep resolving to the new handler). + aliases = getattr(mod, "WORKFLOW_ALIASES", ()) or () + if isinstance(aliases, str): + aliases = (aliases,) + for alias in aliases: + if not isinstance(alias, str) or not alias.strip() or alias == wf_name: + continue + _WORKFLOW_HANDLERS[alias] = registered + discovered[alias] = str(py_file) except Exception: log.warning("workflow_handler_load_failed", file=str(py_file), exc_info=True) @@ -1637,14 +1705,22 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: if thread_key: input_json["thread_key"] = thread_key - # Auto-derive Slack delivery from thread_key + # Auto-derive delivery from thread_key. Default platform is slack + # to preserve back-compat; future platforms can use their own + # ``:...`` thread_key prefix and the resolver below picks + # the right shape. if "delivery" not in input_json: try: channel, thread_ts = _split_thread_key(thread_key) + derived_platform = ( + thread_key.split(":", 1)[0] + if ":" in thread_key + else "slack" + ) input_json["delivery"] = { "channel": channel, "thread_ts": thread_ts, - "platform": "slack", + "platform": derived_platform, } except ControlPlaneError: log.warning( @@ -1653,7 +1729,17 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: thread_key=thread_key, ) - # slack_channel: use channel name for delivery (no thread_ts) + # Schedule-declared explicit delivery (any platform). + sched_delivery = sched.get("delivery") + if ( + isinstance(sched_delivery, dict) + and sched_delivery.get("platform") + and sched_delivery.get("channel") + and "delivery" not in input_json + ): + input_json["delivery"] = dict(sched_delivery) + + # slack_channel: back-compat shorthand for slack channel delivery. if slack_channel and "delivery" not in input_json: input_json["delivery"] = { "channel": slack_channel, @@ -1752,7 +1838,10 @@ def _workflow_request_hash( workflow_name: str, run_input: dict[str, Any], ) -> str: hash_input = dict(run_input) - if workflow_name == "slack_thread_turn": + # messaging_thread_turn (and its legacy alias slack_thread_turn) drops + # history_messages from the request hash so backfill differences don't + # break idempotency. + if workflow_name in {"messaging_thread_turn", "slack_thread_turn"}: hash_input.pop("history_messages", None) return request_hash( {"workflow_name": workflow_name, "input": hash_input}, diff --git a/services/api/api/workflows/slack_thread_turn.py b/services/api/api/workflows/slack_thread_turn.py index cddf4f41..cd22ea1d 100644 --- a/services/api/api/workflows/slack_thread_turn.py +++ b/services/api/api/workflows/slack_thread_turn.py @@ -1,4 +1,12 @@ -"""Workflow: single agent turn in a Slack thread.""" +"""Workflow: single agent turn in a messaging-platform thread. + +This handler is platform-agnostic — the delivery dict it receives carries +``platform``/``channel``/``thread_ts`` for any registered messaging +platform (Slack today, others later). The canonical workflow name is +``messaging_thread_turn``; the legacy name ``slack_thread_turn`` is kept +as an alias so existing clients (the slackbot) keep working until they +migrate. +""" from __future__ import annotations @@ -10,7 +18,8 @@ from api.runtime_control import ControlPlaneError from api.workflow_engine import Delivery, WorkflowContext -WORKFLOW_NAME = "slack_thread_turn" +WORKFLOW_NAME = "messaging_thread_turn" +WORKFLOW_ALIASES = ("slack_thread_turn",) _EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "pi-mono"}) _PROMPT_FLAG_ALIASES = { @@ -469,7 +478,7 @@ async def handler(inp: Input, ctx: WorkflowContext) -> dict[str, Any]: if not thread_key: raise ControlPlaneError( "INVALID_WORKFLOW_INPUT", - "slack_thread_turn requires thread_key", + "messaging_thread_turn requires thread_key", 422, ) diff --git a/services/api/tests/test_agent_control_plane.py b/services/api/tests/test_agent_control_plane.py index 1c8d74b3..317104d5 100644 --- a/services/api/tests/test_agent_control_plane.py +++ b/services/api/tests/test_agent_control_plane.py @@ -1363,6 +1363,8 @@ async def _live_delivery_ack_without_answer_text(*_args, **_kwargs): session_text_mock = AsyncMock() session_done_mock = AsyncMock() backend = SimpleNamespace(attach=AsyncMock(), close_streams=AsyncMock()) + from api.platforms.slack import SLACK_PLATFORM + with ( patch("api.runtime_control.get_or_spawn", new=AsyncMock(return_value=session)), patch( @@ -1377,16 +1379,19 @@ async def _live_delivery_ack_without_answer_text(*_args, **_kwargs): return_value=SimpleNamespace(turn_counter=1), ), patch("api.runtime_control._stream_stdout", _blank_placeholder_stream), - patch( - "api.runtime_control.slackbot_client.harness_event", + patch.object( + SLACK_PLATFORM, + "harness_event", new=AsyncMock(side_effect=_live_delivery_ack_without_answer_text), ), - patch( - "api.runtime_control.slackbot_client.session_text", + patch.object( + SLACK_PLATFORM, + "session_text", new=session_text_mock, ), - patch( - "api.runtime_control.slackbot_client.session_done", + patch.object( + SLACK_PLATFORM, + "session_done", new=session_done_mock, ), ): diff --git a/services/api/tests/test_platforms.py b/services/api/tests/test_platforms.py new file mode 100644 index 00000000..90512a41 --- /dev/null +++ b/services/api/tests/test_platforms.py @@ -0,0 +1,119 @@ +"""Tests for the messaging-platform abstraction in api.platforms.""" + +from __future__ import annotations + +import pytest + +from api.platforms import ( + MessagingPlatform, + PLATFORMS, + resolve_for_delivery, + resolve_for_thread_key, + resolve_platform, +) +from api.platforms.dev import DevPlatform +from api.platforms.slack import SLACK_PLATFORM, SlackPlatform + + +def test_registry_includes_slack_and_dev(): + assert isinstance(PLATFORMS["slack"], SlackPlatform) + assert isinstance(PLATFORMS["dev"], DevPlatform) + + +def test_resolve_platform_known_name(): + assert resolve_platform("slack") is SLACK_PLATFORM + + +def test_resolve_platform_unknown_falls_back_to_dev(): + fallback = resolve_platform("not-a-real-platform") + assert fallback.name == "dev" + + +def test_resolve_platform_none_falls_back_to_dev(): + fallback = resolve_platform(None) + assert fallback.name == "dev" + + +def test_resolve_for_delivery_reads_platform_key(): + assert resolve_for_delivery({"platform": "slack"}) is SLACK_PLATFORM + assert resolve_for_delivery({"platform": "dev"}).name == "dev" + assert resolve_for_delivery({}).name == "dev" + assert resolve_for_delivery(None).name == "dev" + + +def test_resolve_for_thread_key_matches_prefix(): + assert resolve_for_thread_key("slack:C123:1700.0001") is SLACK_PLATFORM + assert resolve_for_thread_key("dev:foo").name == "dev" + assert resolve_for_thread_key("nonexistent:foo") is None + assert resolve_for_thread_key("") is None + assert resolve_for_thread_key(None) is None + + +def test_slack_platform_owns_slack_delivery_identification(): + assert SLACK_PLATFORM.is_delivery_for_me({"platform": "slack"}) is True + assert SLACK_PLATFORM.is_delivery_for_me({"platform": "dev"}) is False + assert SLACK_PLATFORM.is_delivery_for_me({}) is False + assert SLACK_PLATFORM.is_delivery_for_me(None) is False + + +def test_slack_sanitize_strips_known_plumbing_leaks(): + out = SLACK_PLATFORM.sanitize_text( + 'Done. {"kind":"Status","status":"Failure","reason":"AlreadyExists"} ' + "Codex thread `019e3c91-4030-7910`" + ) + assert "k8s status omitted" in out + assert "Codex thread" not in out + + +def test_slack_thread_key_destination_pulls_channel(): + assert SLACK_PLATFORM.thread_key_destination("slack:C123:1700.0001") == "C123" + assert SLACK_PLATFORM.thread_key_destination("slack:T1:C123:1700.0001") == "C123" + assert SLACK_PLATFORM.thread_key_destination("dev:nothing") is None + + +def test_dev_platform_is_noop(): + dev = PLATFORMS["dev"] + assert dev.sanitize_text("hello {with} json") == "hello {with} json" + assert dev.system_prompt_rules(user_id="U1") == [] + assert dev.system_prompt_identity_lines({"x": "y"}) == [] + assert dev.thread_key_destination("slack:C1:1700.0001") is None + + +def test_slack_system_prompt_rules_mentions_requester_when_user_id_present(): + rules_with_user = SLACK_PLATFORM.system_prompt_rules(user_id="U999") + rules_without_user = SLACK_PLATFORM.system_prompt_rules() + assert any("U999" in line for line in rules_with_user) + assert not any("U999" in line for line in rules_without_user) + + +def test_slack_identity_lines_branch_on_verified(): + verified = SLACK_PLATFORM.system_prompt_identity_lines( + { + "slack_user_id": "U1", + "slack_mention": "<@U1>", + "github_handle": "@octocat", + "github_handle_source": "Slack profile custom field", + "github_handle_verified": True, + } + ) + assert any("@octocat" in line for line in verified) + assert any("verified: yes" in line for line in verified) + + unverified = SLACK_PLATFORM.system_prompt_identity_lines( + { + "slack_user_id": "U2", + "slack_mention": "<@U2>", + "github_handle_verified": False, + "github_handle_unavailable_reason": "no GitHub field", + } + ) + assert any("unavailable" in line for line in unverified) + assert any("verified: no" in line for line in unverified) + + +@pytest.mark.asyncio +async def test_base_send_channel_message_raises_for_unimplemented_platforms(): + base = MessagingPlatform() + base.name = "stub" + with pytest.raises(NotImplementedError): + await base.send_channel_message("c", "t") diff --git a/services/api/tests/test_slackbot_client.py b/services/api/tests/test_slackbot_client.py index aea3d175..9706ddd2 100644 --- a/services/api/tests/test_slackbot_client.py +++ b/services/api/tests/test_slackbot_client.py @@ -21,7 +21,7 @@ def _no_sleep(monkeypatch): async def _instant(_s: float) -> None: return None - monkeypatch.setattr("api.slackbot_client.asyncio.sleep", _instant) + monkeypatch.setattr("api.platforms.slack.asyncio.sleep", _instant) def _response(status: int, body: dict[str, Any] | None = None) -> httpx.Response: @@ -50,7 +50,7 @@ async def post(self, url: str, **kwargs: Any) -> httpx.Response: @pytest.mark.asyncio async def test_post_retries_on_5xx_then_returns_payload(): fake = _FakeClient([_response(502), _response(503), _response(200, {"ok": True})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post( @@ -65,7 +65,7 @@ async def test_post_retries_on_5xx_then_returns_payload(): @pytest.mark.parametrize("status", [408, 429]) async def test_post_retries_on_retryable_4xx(status: int): fake = _FakeClient([_response(status), _response(200, {"ok": True})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post("/api/slack/agent-sessions/sess/done", {}) @@ -78,7 +78,7 @@ async def test_post_retries_on_retryable_4xx(status: int): @pytest.mark.parametrize("status", [400, 403, 404]) async def test_post_does_not_retry_on_permanent_4xx(status: int): fake = _FakeClient([_response(status, {"error": "bad"})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post("/api/slack/agent-sessions/sess/done", {}) @@ -90,7 +90,7 @@ async def test_post_does_not_retry_on_permanent_4xx(status: int): @pytest.mark.asyncio async def test_post_returns_none_after_exhausting_retries(): fake = _FakeClient([_response(502), _response(502), _response(502)]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post( diff --git a/services/api/tests/test_slackbot_client_sanitize.py b/services/api/tests/test_slackbot_client_sanitize.py index ffb1afbd..28cf8189 100644 --- a/services/api/tests/test_slackbot_client_sanitize.py +++ b/services/api/tests/test_slackbot_client_sanitize.py @@ -1,10 +1,11 @@ -"""Tests for Slack egress sanitization in slackbot_client.""" +"""Tests for Slack egress sanitization.""" from __future__ import annotations import pytest from api import slackbot_client +from api.platforms import slack as _platform_slack @pytest.fixture @@ -15,7 +16,9 @@ async def fake_post(path: str, body: dict, **_kwargs): calls.append((path, body)) return {"ok": True} - monkeypatch.setattr(slackbot_client, "post", fake_post) + # SLACK_PLATFORM methods call platforms.slack.slackbot_post directly; + # patch the platform-side reference, not the shim alias. + monkeypatch.setattr(_platform_slack, "slackbot_post", fake_post) return calls diff --git a/services/api/tests/test_workflow_idempotency_unit.py b/services/api/tests/test_workflow_idempotency_unit.py index 3d3c1909..3218cde5 100644 --- a/services/api/tests/test_workflow_idempotency_unit.py +++ b/services/api/tests/test_workflow_idempotency_unit.py @@ -96,10 +96,12 @@ async def test_agent_turn_skips_existing_history_message_before_append(): "status": "queued", }) + from api.platforms.slack import SLACK_PLATFORM + with ( patch("api.workflow_engine._compute_agent_session_title", new=AsyncMock(return_value=None)), patch("api.workflow_engine._compute_agent_session_header", new=AsyncMock(return_value=None)), - patch("api.workflow_engine.slackbot_client.open_agent_session", new=AsyncMock(return_value=None)), + patch.object(SLACK_PLATFORM, "open_live_session", new=AsyncMock(return_value=None)), patch("api.workflow_engine.spawn_assignment", new=AsyncMock(return_value={"assignment_generation": 1})), patch("api.workflow_engine._message_request_exists", new=AsyncMock(return_value=True)) as exists, patch("api.workflow_engine.append_message", new=append_message), diff --git a/services/api/tests/test_workflows.py b/services/api/tests/test_workflows.py index 9538ef8b..5c715ce5 100644 --- a/services/api/tests/test_workflows.py +++ b/services/api/tests/test_workflows.py @@ -1392,13 +1392,16 @@ async def open_after_spawn(**kwargs): return_value={"ok": True, "execution_id": "exe-resolved", "status": "queued"}, ) + from api.platforms.slack import SLACK_PLATFORM + with ( patch( "api.workflow_engine.spawn_assignment", new=AsyncMock(side_effect=spawn_after_recording), ), - patch( - "api.workflow_engine.slackbot_client.open_agent_session", + patch.object( + SLACK_PLATFORM, + "open_live_session", new=AsyncMock(side_effect=open_after_spawn), ) as open_session_mock, patch("api.workflow_engine.append_message", new=append_message_mock), @@ -1450,21 +1453,26 @@ async def test_agent_turn_spawn_failure_opens_unresolved_failure_session(db_pool append_message_mock = AsyncMock() enqueue_execution_mock = AsyncMock() + from api.platforms.slack import SLACK_PLATFORM + with ( patch( "api.workflow_engine.spawn_assignment", new=AsyncMock(side_effect=RuntimeError("spawn unavailable")), ), - patch( - "api.workflow_engine.slackbot_client.open_agent_session", + patch.object( + SLACK_PLATFORM, + "open_live_session", new=AsyncMock(return_value="sess-failed"), ) as open_session_mock, - patch( - "api.workflow_engine.slackbot_client.session_text", + patch.object( + SLACK_PLATFORM, + "session_text", new=AsyncMock(), ) as session_text_mock, - patch( - "api.workflow_engine.slackbot_client.session_done", + patch.object( + SLACK_PLATFORM, + "session_done", new=AsyncMock(), ) as session_done_mock, patch("api.workflow_engine.append_message", new=append_message_mock), From 684180067112db9c1f30df70d4c424ff5d32890f Mon Sep 17 00:00:00 2001 From: Will Drach Date: Fri, 22 May 2026 23:14:15 -0600 Subject: [PATCH 2/5] Address review feedback: dataclass identity, explicit registration, dead code Quick-win cluster from the multi-agent review of e2a54efd: - Promote requester identity to a frozen `RequesterIdentity` dataclass (platforms/__init__.py) instead of `dict[str, str | bool]`. Field names drop the `slack_` prefix since the data is platform-agnostic; per-platform rendering ("Slack user ID: ...") stays on `system_prompt_identity_lines`. Updates `SlackPlatform.load_requester_identity`, `system_prompt_identity_lines`, and the two test files that constructed the old dict shape. - Delete dead code: `resolve_for_metadata` had zero callers; `_DEV_FALLBACK` was unreachable defensive code since `DevPlatform` registers eagerly. - Centralize platform registration via `register_builtin_platforms()` in `platforms/__init__.py`. `platforms/dev.py` and `platforms/slack.py` now only expose their singleton (`DEV_PLATFORM`, `SLACK_PLATFORM`); the registration call lives in one place. Removes the bottom-of-module side-effect imports with their `noqa: E402, F401` lies. - Document the platform-agnostic execution metadata keys (`slackbot_live_delivery`, `slackbot_agent_session_id`, etc.) in the `api.platforms` module docstring. They're Slack-named for historical reasons but read by all platforms; future schema cleanup will rename them to `live_session_*`. - Add `MessagingPlatform.intercepts_tool_call(tool_name, method_name)` sync hint. `tool_manager._capture_active_thread_tool_call` now skips the async dispatch hop into `capture_active_thread_tool_call` for any tool call the active platform doesn't intercept. `SlackPlatform` returns `True` only for `slack.send_message`, restoring the pre-PR fast path for non-send-message tool calls on Slack threads. - Dedupe schedule registration for workflows with `WORKFLOW_ALIASES` (workflow_engine.py:_registered_schedule_specs). The alias system registers the same `_RegisteredHandler` under multiple names; without deduping, a future workflow with both aliases and a `CRON`/`SCHEDULE` would fire its schedule N times per tick. Currently latent (`slack_thread_turn` has no schedule), but fixed proactively. - Narrow over-broad `except (ValueError, TypeError)` to `json.JSONDecodeError` in `SlackPlatform.send_channel_message`. Verified via uv run ruff check api/ and uv run pytest -q on test_platforms.py (17 cases including 4 new), test_integration.py, test_slackbot_client*, test_workflow_idempotency_unit.py, and the CI-targeted subset of test_workflows.py and test_agent_control_plane.py. 78/78 targeted tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/api/api/platforms/__init__.py | 96 ++++++++++++++++++-------- services/api/api/platforms/dev.py | 11 ++- services/api/api/platforms/slack.py | 84 +++++++++++----------- services/api/api/tool_manager.py | 2 +- services/api/api/workflow_engine.py | 7 ++ services/api/tests/test_integration.py | 44 ++++++------ services/api/tests/test_platforms.py | 55 +++++++++++---- 7 files changed, 189 insertions(+), 110 deletions(-) diff --git a/services/api/api/platforms/__init__.py b/services/api/api/platforms/__init__.py index ff8a16ea..74259bdb 100644 --- a/services/api/api/platforms/__init__.py +++ b/services/api/api/platforms/__init__.py @@ -11,12 +11,31 @@ default fallback when the platform is missing or unknown is ``"dev"``, which is a no-op implementation suitable for unit tests and the localhost bypass path. + +Built-in platforms are wired up by ``register_builtin_platforms()`` at the +bottom of this module, so simple ``from api.platforms import +resolve_platform`` works without explicit app-startup setup. + +Platform-agnostic execution metadata keys +----------------------------------------- +The execution worker reads and writes several JSONB metadata keys on +``agent_execution_requests`` that are conceptually platform-agnostic but +historically Slack-named. Any platform that implements live streaming +must honor these spellings so the rest of the system stays platform-blind: + +- ``slackbot_live_delivery`` (bool) — gate for live-session forwarding +- ``slackbot_agent_session_id`` (str) — opaque per-execution session id +- ``slackbot_live_delivery_failed`` (str) — reason live forwarding bailed +- ``slackbot_streamed_answer_chars`` (int) — chars already streamed live + +A future schema-cleanup PR will rename these to ``live_session_*``; until +then platforms write under the legacy spellings. """ from __future__ import annotations import json -import re +from dataclasses import dataclass from typing import Any import structlog @@ -24,6 +43,25 @@ log = structlog.get_logger() +@dataclass(frozen=True) +class RequesterIdentity: + """Who triggered an agent turn, as recovered from the messaging platform. + + Rendered into the system prompt so the agent can address the requester + by handle, mention them in replies, and look up their GitHub identity. + Construction is platform-agnostic; per-platform rendering of e.g. + ``"Slack user ID: ..."`` lives on the platform's + ``system_prompt_identity_lines`` method. + """ + + user_id: str + mention: str + github_handle: str | None = None + github_handle_source: str | None = None + github_handle_verified: bool = False + github_handle_unavailable_reason: str | None = None + + class MessagingPlatform: """Base implementation. Subclasses override what the platform supports. @@ -37,10 +75,11 @@ class MessagingPlatform: message_chunk_chars: int = 12_000 step_chunk_chars: int = 12_000 - # Regex that matches a leading ``<@USER_ID>`` mention prefix and captures - # the remaining text. Used by recovery-command normalisation in the - # messaging_thread_turn workflow. Default never matches. - mention_prefix_re: re.Pattern[str] = re.compile(r"(?!x)x") + # Regex matching a leading ``<@USER_ID>`` mention prefix and capturing + # the remaining text. Used by recovery-command normalization in the + # ``messaging_thread_turn`` workflow. ``None`` means the platform has + # no mention-prefix grammar; callers should fall through. + mention_prefix_re: Any = None # ── Delivery predicates ──────────────────────────────────────────── @@ -80,11 +119,11 @@ def thread_key_destination(self, thread_key: str) -> str | None: async def load_requester_identity( self, user_id: str | None - ) -> dict[str, str | bool] | None: + ) -> RequesterIdentity | None: return None def system_prompt_identity_lines( - self, identity: dict[str, str | bool] | None + self, identity: RequesterIdentity | None ) -> list[str]: return [] @@ -158,6 +197,15 @@ async def send_channel_message( # ── Live tool-call capture ───────────────────────────────────────── + def intercepts_tool_call(self, tool_name: str, method_name: str) -> bool: + """Sync hint: does this platform want a chance to capture this tool call? + + Lets ``tool_manager`` skip the ``await`` dispatch into + ``capture_active_thread_tool_call`` for tool calls no platform + intercepts (the overwhelmingly common case). Default: never. + """ + return False + async def capture_active_thread_tool_call( self, *, @@ -197,7 +245,7 @@ def resolve_platform(name: str | None) -> MessagingPlatform: return PLATFORMS[name] if name: log.debug("resolve_platform_unknown", requested=name) - return PLATFORMS.get("dev") or _DEV_FALLBACK + return PLATFORMS["dev"] def resolve_for_delivery( @@ -222,29 +270,19 @@ def resolve_for_thread_key(thread_key: str | None) -> MessagingPlatform | None: return PLATFORMS.get(prefix) -def resolve_for_metadata( - metadata: dict[str, Any] | None, -) -> MessagingPlatform | None: - """Resolve from metadata.platform if present; ``None`` otherwise. +def register_builtin_platforms() -> None: + """Wire up the platforms that ship with the API. - Unlike ``resolve_for_delivery``, this does not fall back to dev — it - distinguishes "no platform recorded" from "dev platform". + Called at module import time (below) so simple ``from api.platforms + import resolve_platform`` works without explicit app-startup setup. + Tests that need a clean registry can call ``PLATFORMS.clear()`` and + re-invoke this. """ - if not isinstance(metadata, dict): - return None - value = metadata.get("platform") - if isinstance(value, str) and value and value in PLATFORMS: - return PLATFORMS[value] - return None - + from api.platforms.dev import DEV_PLATFORM + from api.platforms.slack import SLACK_PLATFORM -# Defensive fallback used if resolve_platform runs before registration. -_DEV_FALLBACK = MessagingPlatform() -_DEV_FALLBACK.name = "dev" + register_platform(DEV_PLATFORM) + register_platform(SLACK_PLATFORM) -# Eager imports so registration happens on package import. Kept at the -# bottom to avoid circular-import issues with platforms that import from -# api.* at module load. -from api.platforms import dev as _dev_module # noqa: E402, F401 -from api.platforms import slack as _slack_module # noqa: E402, F401 +register_builtin_platforms() diff --git a/services/api/api/platforms/dev.py b/services/api/api/platforms/dev.py index 41df024f..7f21da36 100644 --- a/services/api/api/platforms/dev.py +++ b/services/api/api/platforms/dev.py @@ -1,13 +1,18 @@ """Dev platform: no-op everything. Used for localhost-bypass executions -and unit tests where no real messaging integration is wired up.""" +and unit tests where no real messaging integration is wired up. + +Registration happens centrally via +``api.platforms.register_builtin_platforms``; importing this module +does not register on its own. +""" from __future__ import annotations -from api.platforms import MessagingPlatform, register_platform +from api.platforms import MessagingPlatform class DevPlatform(MessagingPlatform): name = "dev" -register_platform(DevPlatform()) +DEV_PLATFORM = DevPlatform() diff --git a/services/api/api/platforms/slack.py b/services/api/api/platforms/slack.py index ebc4a6d1..6f040d67 100644 --- a/services/api/api/platforms/slack.py +++ b/services/api/api/platforms/slack.py @@ -15,12 +15,13 @@ import os import re from collections.abc import Callable +from dataclasses import replace from typing import Any import httpx import structlog -from api.platforms import MessagingPlatform, register_platform +from api.platforms import MessagingPlatform, RequesterIdentity log = structlog.get_logger() @@ -335,6 +336,16 @@ def extract_github_handle_from_slack_profile( # ── Platform implementation ─────────────────────────────────────────── +def _identity_with_unavailable_reason( + base: RequesterIdentity, reason: str +) -> RequesterIdentity: + return replace( + base, + github_handle_verified=False, + github_handle_unavailable_reason=reason, + ) + + class SlackPlatform(MessagingPlatform): name = "slack" mention_prefix_re = _SLACK_ID_MENTION_RE @@ -359,13 +370,10 @@ def thread_key_destination(self, thread_key: str) -> str | None: async def load_requester_identity( self, user_id: str | None - ) -> dict[str, str | bool] | None: + ) -> RequesterIdentity | None: if not user_id: return None - identity: dict[str, str | bool] = { - "slack_user_id": user_id, - "slack_mention": f"<@{user_id}>", - } + base = RequesterIdentity(user_id=user_id, mention=f"<@{user_id}>") try: from api.app import get_tool_manager @@ -379,13 +387,9 @@ async def load_requester_identity( user_id=user_id, error=str(exc), ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } + return _identity_with_unavailable_reason( + base, "Slack profile could not be fetched" ) - return identity if not isinstance(profile, dict) or profile.get("error"): error = str(profile.get("error") or "Slack profile could not be fetched") @@ -395,49 +399,37 @@ async def load_requester_identity( user_id=user_id, error=error, ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } + return _identity_with_unavailable_reason( + base, "Slack profile could not be fetched" ) - return identity handle, source, reason = extract_github_handle_from_slack_profile(profile) if handle: - identity.update( - { - "github_handle": handle, - "github_handle_source": source or "Slack profile custom field", - "github_handle_verified": True, - } + return replace( + base, + github_handle=handle, + github_handle_source=source or "Slack profile custom field", + github_handle_verified=True, ) - else: - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": reason, - } - ) - return identity + return _identity_with_unavailable_reason(base, reason) def system_prompt_identity_lines( - self, identity: dict[str, str | bool] | None + self, identity: RequesterIdentity | None ) -> list[str]: - if not identity: + if identity is None: return [] lines = [ "", "## Requester Identity", "", - f"- Slack user ID: {identity['slack_user_id']}", - f"- Slack mention: {identity['slack_mention']}", + f"- Slack user ID: {identity.user_id}", + f"- Slack mention: {identity.mention}", ] - if identity.get("github_handle_verified"): + if identity.github_handle_verified: lines.extend( [ - f"- GitHub handle from Slack profile: {identity['github_handle']}", - f"- GitHub handle source: {identity['github_handle_source']}", + f"- GitHub handle from Slack profile: {identity.github_handle}", + f"- GitHub handle source: {identity.github_handle_source}", "- GitHub handle verified: yes", ] ) @@ -446,7 +438,7 @@ def system_prompt_identity_lines( [ "- GitHub handle from Slack profile: unavailable", "- GitHub handle unavailable reason: " - f"{identity['github_handle_unavailable_reason']}", + f"{identity.github_handle_unavailable_reason}", "- GitHub handle verified: no", ] ) @@ -596,7 +588,7 @@ async def send_channel_message( raw = await get_tool_manager().call_tool("slack", "send_message", args) try: result = json.loads(raw) if isinstance(raw, str) else raw - except (ValueError, TypeError): + except json.JSONDecodeError: result = {"raw": raw} if isinstance(result, dict) and result.get("error"): raise RuntimeError(str(result["error"])) @@ -604,6 +596,9 @@ async def send_channel_message( # ── Live tool-call capture ───────────────────────────────────── + def intercepts_tool_call(self, tool_name: str, method_name: str) -> bool: + return tool_name == "slack" and method_name == "send_message" + async def capture_active_thread_tool_call( self, *, @@ -615,7 +610,7 @@ async def capture_active_thread_tool_call( ) -> dict[str, Any] | None: if request is None or not sandbox_claims: return None - if tool_name != "slack" or method_name != "send_message": + if not self.intercepts_tool_call(tool_name, method_name): return None thread_key = str(sandbox_claims.get("thread_key") or "") @@ -673,7 +668,8 @@ async def capture_active_thread_tool_call( } -# ── Singleton + registration ────────────────────────────────────────── +# ── Singleton ───────────────────────────────────────────────────────── +# Registration is centralized via api.platforms.register_builtin_platforms; +# this module only exposes the singleton. SLACK_PLATFORM = SlackPlatform() -register_platform(SLACK_PLATFORM) diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index a32a71e4..3802c0a9 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -927,7 +927,7 @@ async def _capture_active_thread_tool_call( return None thread_key = str(sandbox_claims.get("thread_key") or "") platform = resolve_for_thread_key(thread_key) - if platform is None: + if platform is None or not platform.intercepts_tool_call(tool_name, method_name): return None return await platform.capture_active_thread_tool_call( request=request, diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index b61955ba..b4b34ecf 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -1660,10 +1660,17 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: that write to DB instead of posting to Slack) """ specs: list[ScheduleSpec] = [] + # Workflow aliases register the same _RegisteredHandler under multiple + # names; iterate canonical handlers once so a scheduled workflow with + # aliases doesn't fire its schedule N times per tick. + seen_handlers: set[int] = set() for wf_name, reg in _WORKFLOW_HANDLERS.items(): sched = reg.schedule if not sched: continue + if id(reg) in seen_handlers: + continue + seen_handlers.add(id(reg)) cron_expr = sched.get("cron") interval_s = sched.get("interval_seconds") if not cron_expr and not interval_s: diff --git a/services/api/tests/test_integration.py b/services/api/tests/test_integration.py index 5fdb9283..ec0a67d3 100644 --- a/services/api/tests/test_integration.py +++ b/services/api/tests/test_integration.py @@ -562,18 +562,19 @@ def test_contains_timestamp(self): def test_requester_identity_with_github_handle(self): from api.agent import _build_session_context + from api.platforms import RequesterIdentity ctx = _build_session_context( "test:1", platform="slack", user_id="U123", - requester_identity={ - "slack_user_id": "U123", - "slack_mention": "<@U123>", - "github_handle": "@alice", - "github_handle_source": 'Slack profile custom field "GitHub"', - "github_handle_verified": True, - }, + requester_identity=RequesterIdentity( + user_id="U123", + mention="<@U123>", + github_handle="@alice", + github_handle_source='Slack profile custom field "GitHub"', + github_handle_verified=True, + ), ) assert "Requester Identity" in ctx @@ -582,19 +583,20 @@ def test_requester_identity_with_github_handle(self): def test_requester_identity_without_github_handle(self): from api.agent import _build_session_context + from api.platforms import RequesterIdentity ctx = _build_session_context( "test:1", platform="slack", user_id="U123", - requester_identity={ - "slack_user_id": "U123", - "slack_mention": "<@U123>", - "github_handle_verified": False, - "github_handle_unavailable_reason": ( + requester_identity=RequesterIdentity( + user_id="U123", + mention="<@U123>", + github_handle_verified=False, + github_handle_unavailable_reason=( "no GitHub custom field found on Slack profile" ), - }, + ), ) assert "GitHub handle from Slack profile: unavailable" in ctx @@ -652,16 +654,18 @@ async def test_insert_system_message_uses_thread_user_id_fallback( thread_key, ) + from api.platforms import RequesterIdentity + async def fake_resolve_requester_identity(*, platform, user_id): assert platform == "slack" assert user_id == "U123" - return { - "slack_user_id": user_id, - "slack_mention": f"<@{user_id}>", - "github_handle": "@alice", - "github_handle_source": 'Slack profile custom field "GitHub"', - "github_handle_verified": True, - } + return RequesterIdentity( + user_id=user_id, + mention=f"<@{user_id}>", + github_handle="@alice", + github_handle_source='Slack profile custom field "GitHub"', + github_handle_verified=True, + ) monkeypatch.setattr( agent, diff --git a/services/api/tests/test_platforms.py b/services/api/tests/test_platforms.py index 90512a41..692131e9 100644 --- a/services/api/tests/test_platforms.py +++ b/services/api/tests/test_platforms.py @@ -7,6 +7,7 @@ from api.platforms import ( MessagingPlatform, PLATFORMS, + RequesterIdentity, resolve_for_delivery, resolve_for_thread_key, resolve_platform, @@ -88,24 +89,24 @@ def test_slack_system_prompt_rules_mentions_requester_when_user_id_present(): def test_slack_identity_lines_branch_on_verified(): verified = SLACK_PLATFORM.system_prompt_identity_lines( - { - "slack_user_id": "U1", - "slack_mention": "<@U1>", - "github_handle": "@octocat", - "github_handle_source": "Slack profile custom field", - "github_handle_verified": True, - } + RequesterIdentity( + user_id="U1", + mention="<@U1>", + github_handle="@octocat", + github_handle_source="Slack profile custom field", + github_handle_verified=True, + ) ) assert any("@octocat" in line for line in verified) assert any("verified: yes" in line for line in verified) unverified = SLACK_PLATFORM.system_prompt_identity_lines( - { - "slack_user_id": "U2", - "slack_mention": "<@U2>", - "github_handle_verified": False, - "github_handle_unavailable_reason": "no GitHub field", - } + RequesterIdentity( + user_id="U2", + mention="<@U2>", + github_handle_verified=False, + github_handle_unavailable_reason="no GitHub field", + ) ) assert any("unavailable" in line for line in unverified) assert any("verified: no" in line for line in unverified) @@ -117,3 +118,31 @@ async def test_base_send_channel_message_raises_for_unimplemented_platforms(): base.name = "stub" with pytest.raises(NotImplementedError): await base.send_channel_message("c", "t") + + +def test_slack_intercepts_only_send_message(): + assert SLACK_PLATFORM.intercepts_tool_call("slack", "send_message") is True + assert SLACK_PLATFORM.intercepts_tool_call("slack", "get_channel_history") is False + assert SLACK_PLATFORM.intercepts_tool_call("websearch", "search") is False + + +def test_base_platform_does_not_intercept_any_tool_call(): + base = MessagingPlatform() + base.name = "stub" + assert base.intercepts_tool_call("slack", "send_message") is False + + +def test_requester_identity_defaults_unverified_with_no_handle(): + identity = RequesterIdentity(user_id="U1", mention="<@U1>") + assert identity.github_handle is None + assert identity.github_handle_verified is False + assert identity.github_handle_unavailable_reason is None + + +def test_register_builtin_platforms_repopulates_cleared_registry(): + from api.platforms import register_builtin_platforms + + PLATFORMS.clear() + register_builtin_platforms() + assert "slack" in PLATFORMS + assert "dev" in PLATFORMS From a8eebf89af4dbed3f49403997e4f959fb74edab8 Mon Sep 17 00:00:00 2001 From: Will Drach Date: Fri, 22 May 2026 23:20:36 -0600 Subject: [PATCH 3/5] Decouple platform capture from FastAPI request + DB schema MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously SlackPlatform.capture_active_thread_tool_call reached into ``request.app.state.db_pool`` and ran a raw SQL query against ``agent_execution_requests`` to fetch the live-session id. That coupled every platform adapter to FastAPI app state and the execution-request schema — a second platform (Discord) would have to duplicate the query or accept the same coupling. Split the responsibility cleanly: - New ``ActiveThreadCapture`` NamedTuple in ``api.platforms`` carries the pure-data result of a capture decision (text to forward into the live session + envelope returned to the agent). - ``MessagingPlatform.match_active_thread_capture(thread_key, tool_name, method_name, args)`` is now the platform's only capture surface. It's synchronous, takes pure data, returns ``ActiveThreadCapture | None``, and performs zero I/O. Replaces both the old async ``capture_active_thread_tool_call`` and the ``intercepts_tool_call`` sync hint introduced in the prior commit. - ``runtime_control.get_live_session_id_for_thread(pool, thread_key)`` centralizes the live-session lookup query (which reads the platform-agnostic ``slackbot_live_delivery`` / ``slackbot_agent_session_id`` metadata keys documented on ``api.platforms``). - ``tool_manager._capture_active_thread_tool_call`` is the only place that wires the two together: resolve platform, ask platform to match (sync), look up session_id (async DB), forward text via ``platform.session_text``. Logs as ``active_thread_tool_call_captured`` with the platform name. Adds 6 new test cases in ``test_platforms.py`` for the match function (non-send-message tool, foreign-channel write, foreign-thread write, empty text, non-slack thread_key, base no-op). Verified: uv run ruff check api/ clean; 82/82 targeted tests pass (platform suite, slackbot_client suite, workflow idempotency, integration, CI-targeted workflows + agent_control_plane). Co-Authored-By: Claude Opus 4.7 (1M context) --- services/api/api/platforms/__init__.py | 40 ++++++----- services/api/api/platforms/slack.py | 60 ++++------------ services/api/api/runtime_control.py | 26 +++++++ services/api/api/tool_manager.py | 36 ++++++++-- services/api/tests/test_platforms.py | 95 ++++++++++++++++++++++++-- 5 files changed, 182 insertions(+), 75 deletions(-) diff --git a/services/api/api/platforms/__init__.py b/services/api/api/platforms/__init__.py index 74259bdb..b758f163 100644 --- a/services/api/api/platforms/__init__.py +++ b/services/api/api/platforms/__init__.py @@ -36,13 +36,25 @@ import json from dataclasses import dataclass -from typing import Any +from typing import Any, NamedTuple import structlog log = structlog.get_logger() +class ActiveThreadCapture(NamedTuple): + """Result of matching a tool call for re-routing into the live session. + + ``text`` is what the platform wants forwarded into the live session as + the agent's reply. ``envelope`` is the dict returned to the agent in + place of the normal tool result so it knows the call was intercepted. + """ + + text: str + envelope: dict[str, Any] + + @dataclass(frozen=True) class RequesterIdentity: """Who triggered an agent turn, as recovered from the messaging platform. @@ -197,26 +209,22 @@ async def send_channel_message( # ── Live tool-call capture ───────────────────────────────────────── - def intercepts_tool_call(self, tool_name: str, method_name: str) -> bool: - """Sync hint: does this platform want a chance to capture this tool call? - - Lets ``tool_manager`` skip the ``await`` dispatch into - ``capture_active_thread_tool_call`` for tool calls no platform - intercepts (the overwhelmingly common case). Default: never. - """ - return False - - async def capture_active_thread_tool_call( + def match_active_thread_capture( self, *, - request: Any, - sandbox_claims: dict[str, Any] | None, + thread_key: str, tool_name: str, method_name: str, args: dict[str, Any], - ) -> dict[str, Any] | None: - """If an agent tool call should be re-routed through the live session - instead of hitting the platform's API, return a captured-result dict. + ) -> ActiveThreadCapture | None: + """Pure-function check: should this tool call be re-routed through + the live session instead of hitting the platform's API? + + Returns the text to forward + the response envelope on capture, + ``None`` when the call should pass through to the normal tool + path. The platform performs no I/O; ``tool_manager`` owns the + live-session lookup (via ``runtime_control.get_live_session_id_for_thread``) + and the actual ``session_text`` forward. Slack catches ``slack.send_message`` to the active thread; other platforms return ``None`` by default. diff --git a/services/api/api/platforms/slack.py b/services/api/api/platforms/slack.py index 6f040d67..8087e2d2 100644 --- a/services/api/api/platforms/slack.py +++ b/services/api/api/platforms/slack.py @@ -21,7 +21,7 @@ import httpx import structlog -from api.platforms import MessagingPlatform, RequesterIdentity +from api.platforms import ActiveThreadCapture, MessagingPlatform, RequesterIdentity log = structlog.get_logger() @@ -596,24 +596,16 @@ async def send_channel_message( # ── Live tool-call capture ───────────────────────────────────── - def intercepts_tool_call(self, tool_name: str, method_name: str) -> bool: - return tool_name == "slack" and method_name == "send_message" - - async def capture_active_thread_tool_call( + def match_active_thread_capture( self, *, - request: Any, - sandbox_claims: dict[str, Any] | None, + thread_key: str, tool_name: str, method_name: str, args: dict[str, Any], - ) -> dict[str, Any] | None: - if request is None or not sandbox_claims: - return None - if not self.intercepts_tool_call(tool_name, method_name): + ) -> ActiveThreadCapture | None: + if tool_name != "slack" or method_name != "send_message": return None - - thread_key = str(sandbox_claims.get("thread_key") or "") parts = thread_key.split(":") if len(parts) < 4 or parts[0] != "slack": return None @@ -628,44 +620,18 @@ async def capture_active_thread_tool_call( return None if requested_thread_ts and requested_thread_ts != active_thread_ts: return None - text = str(args.get("text") or args.get("message") or "").strip() if not text: return None - - pool = getattr(request.app.state, "db_pool", None) - if pool is None: - return None - session_id = await pool.fetchval( - "SELECT metadata->>'slackbot_agent_session_id' " - "FROM agent_execution_requests " - "WHERE thread_key = $1 " - "AND status = 'running' " - "AND (" - " metadata->>'slackbot_live_delivery' = 'true' " - " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" - ") " - "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " - "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", - thread_key, - ) - session_id = str(session_id or "").strip() - if not session_id: - return None - - await self.session_text(session_id, text) - log.info( - "slack_send_message_captured", - thread_key=thread_key, - sandbox_container_id=sandbox_claims.get("container_id"), - slackbot_agent_session_id=session_id, + return ActiveThreadCapture( + text=text, + envelope={ + "captured": True, + "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", + "channel": active_channel, + "thread_ts": active_thread_ts, + }, ) - return { - "captured": True, - "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", - "channel": active_channel, - "thread_ts": active_thread_ts, - } # ── Singleton ───────────────────────────────────────────────────────── diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index 8a0695c9..5a0510e9 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -906,6 +906,32 @@ def execution_terminal(status: str) -> bool: return status in {"completed", "failed_permanent", "cancelled"} +async def get_live_session_id_for_thread(pool, thread_key: str) -> str | None: + """Return the live-delivery session_id bound to this thread's currently + running execution, or None when no execution has live delivery open. + + Reads the platform-agnostic metadata-key contract documented in + ``api.platforms`` (``slackbot_live_delivery`` / ``slackbot_agent_session_id``). + Platform adapters use this via ``tool_manager`` to forward an active + tool call into the live session without coupling themselves to the + ``agent_execution_requests`` schema. + """ + session_id = await pool.fetchval( + "SELECT metadata->>'slackbot_agent_session_id' " + "FROM agent_execution_requests " + "WHERE thread_key = $1 " + "AND status = 'running' " + "AND (" + " metadata->>'slackbot_live_delivery' = 'true' " + " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" + ") " + "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " + "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", + thread_key, + ) + return str(session_id or "").strip() or None + + def build_execution_state_payload( *, execution_id: str, diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index 3802c0a9..21749cf7 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -920,22 +920,46 @@ async def _capture_active_thread_tool_call( tool call (e.g. an agent calling ``slack.send_message`` to the active thread is re-routed through the live streaming session). - Returns the captured-result envelope or ``None`` to fall through to the - normal tool path. + The platform's ``match_active_thread_capture`` is a pure synchronous + function that decides whether to capture; this wrapper owns the + live-session lookup and the actual ``session_text`` forward so + platform adapters stay decoupled from FastAPI app state and the + ``agent_execution_requests`` schema. + + Returns the captured-result envelope or ``None`` to fall through to + the normal tool path. """ if request is None or not sandbox_claims: return None thread_key = str(sandbox_claims.get("thread_key") or "") platform = resolve_for_thread_key(thread_key) - if platform is None or not platform.intercepts_tool_call(tool_name, method_name): + if platform is None: return None - return await platform.capture_active_thread_tool_call( - request=request, - sandbox_claims=sandbox_claims, + match = platform.match_active_thread_capture( + thread_key=thread_key, tool_name=tool_name, method_name=method_name, args=args, ) + if match is None: + return None + pool = getattr(request.app.state, "db_pool", None) + if pool is None: + return None + from api.runtime_control import get_live_session_id_for_thread + + session_id = await get_live_session_id_for_thread(pool, thread_key) + if not session_id: + return None + await platform.session_text(session_id, match.text) + log.info( + "active_thread_tool_call_captured", + thread_key=thread_key, + platform=platform.name, + sandbox_container_id=sandbox_claims.get("container_id"), + live_session_id=session_id, + ) + return match.envelope async def _extract_tool_attachment( diff --git a/services/api/tests/test_platforms.py b/services/api/tests/test_platforms.py index 692131e9..f5733325 100644 --- a/services/api/tests/test_platforms.py +++ b/services/api/tests/test_platforms.py @@ -120,16 +120,99 @@ async def test_base_send_channel_message_raises_for_unimplemented_platforms(): await base.send_channel_message("c", "t") -def test_slack_intercepts_only_send_message(): - assert SLACK_PLATFORM.intercepts_tool_call("slack", "send_message") is True - assert SLACK_PLATFORM.intercepts_tool_call("slack", "get_channel_history") is False - assert SLACK_PLATFORM.intercepts_tool_call("websearch", "search") is False +def test_slack_match_captures_send_message_to_active_thread(): + match = SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "thread_ts": "1700.0001", "text": "hi"}, + ) + assert match is not None + assert match.text == "hi" + assert match.envelope["captured"] is True + assert match.envelope["channel"] == "C123" + + +def test_slack_match_skips_non_send_message_tool_calls(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="get_channel_history", + args={"channel": "C123"}, + ) + is None + ) + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="websearch", + method_name="search", + args={"query": "hi"}, + ) + is None + ) + + +def test_slack_match_skips_writes_to_other_channels(): + # Channel ID different from the active thread's channel. + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C999", "text": "hi"}, + ) + is None + ) + # Same channel but explicitly targeting a different thread. + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "thread_ts": "1700.9999", "text": "hi"}, + ) + is None + ) -def test_base_platform_does_not_intercept_any_tool_call(): +def test_slack_match_skips_empty_text(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": " "}, + ) + is None + ) + + +def test_slack_match_skips_non_slack_thread_key(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="dev:nothing:here", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": "hi"}, + ) + is None + ) + + +def test_base_platform_match_returns_none(): base = MessagingPlatform() base.name = "stub" - assert base.intercepts_tool_call("slack", "send_message") is False + assert ( + base.match_active_thread_capture( + thread_key="anything:1:2", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": "hi"}, + ) + is None + ) def test_requester_identity_defaults_unverified_with_no_handle(): From 5370408ddb67fc73dd3ad291cfb269d3c9fe3859 Mon Sep 17 00:00:00 2001 From: Will Drach Date: Fri, 22 May 2026 23:32:17 -0600 Subject: [PATCH 4/5] Polish: remove obfuscation, tuple constant, declarative hash-ignore, deprec shim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P3 cleanup bundle from the multi-agent review: - Drop the ``"slackbot" + "_v" + "2_live_delivery"`` Python-side concat and the matching SQL ``'slackbot' || '_v' || '2_live_delivery'`` trick. Write the legacy key as a plain string with an explanatory comment so future readers (and grep) see what's actually being checked. - Simplify ``_valid_github_handle`` triple-strip + path-trim into a single ``.strip("@ \t\n").rstrip("/").split("/", 1)[0]`` chain with a comment documenting the inputs it accepts. - Promote ``HASH_IGNORED_INPUT_KEYS`` to a workflow module attribute. ``_RegisteredHandler`` now carries the tuple; ``_workflow_request_hash`` consults the registered handler instead of the hard-coded ``{"messaging_thread_turn", "slack_thread_turn"}`` set. The ``messaging_thread_turn`` workflow declares ``HASH_IGNORED_INPUT_KEYS = ("history_messages",)`` directly. - Move ``_SLACK_CHANNEL_ID_RE`` next to its sole consumer; collect the Slack formatting-rule lines into a module-level ``_SLACK_FORMATTING_RULES`` tuple constant so ``system_prompt_rules`` shrinks to one composition expression. - Emit a ``DeprecationWarning`` on import of ``api.slackbot_client`` and ``api.slack_sanitize`` so the shims don't accrete new callers before the planned removal PR. - Trim "Phase 0 → Phase 1" refactor-history wording from module docstrings now that we're past that transition; replace with concrete forward-looking migration guidance. Verified: uv run ruff check api/ clean; 82/82 targeted tests pass; full test_workflows.py shows only the pre-existing ``test_slack_thread_turn_attachment_roundtrip_to_agent`` failure (unrelated to this PR, fails on pristine main as well). Co-Authored-By: Claude Opus 4.7 (1M context) --- services/api/api/platforms/slack.py | 39 ++++++++++--------- services/api/api/runtime_control.py | 11 +++++- services/api/api/slack_sanitize.py | 15 ++++++- services/api/api/slackbot_client.py | 14 +++++-- services/api/api/workflow_engine.py | 22 ++++++++--- .../api/api/workflows/slack_thread_turn.py | 3 ++ 6 files changed, 74 insertions(+), 30 deletions(-) diff --git a/services/api/api/platforms/slack.py b/services/api/api/platforms/slack.py index 8087e2d2..5c5c1f19 100644 --- a/services/api/api/platforms/slack.py +++ b/services/api/api/platforms/slack.py @@ -2,10 +2,9 @@ system-prompt injection, live-streaming session, and active-thread ``slack.send_message`` capture. -Everything Slack-specific lives here. The thin shims at +Everything Slack-specific lives here. Thin shims at ``api/slack_sanitize.py`` and ``api/slackbot_client.py`` re-export the -module-level helpers so existing callsites keep working during the -Phase 0 → Phase 1 transition. +public surface for back-compat with older callers. """ from __future__ import annotations @@ -264,8 +263,10 @@ def slack_thread_key_to_channel(thread_key: str) -> str: def _valid_github_handle(value: str) -> str | None: - candidate = value.strip().strip("@").strip() - candidate = candidate.rstrip("/").split("/", 1)[0] + # Accept inputs like " @octocat ", "@octocat/repo", or "octocat/" by + # stripping surrounding whitespace + leading ``@`` and lopping off + # any trailing path segment before pattern matching. + candidate = value.strip("@ \t\n").rstrip("/").split("/", 1)[0] return candidate if _GITHUB_HANDLE_RE.match(candidate) else None @@ -328,11 +329,24 @@ def extract_github_handle_from_slack_profile( ) -# ── Live tool-call capture predicate ────────────────────────────────── +# ── Slack channel-ID shape, used inside match_active_thread_capture ─── _SLACK_CHANNEL_ID_RE = re.compile(r"^[CDG][A-Z0-9]+$") +# ── Slack-flavoured system-prompt formatting rules ──────────────────── + +_SLACK_FORMATTING_RULES: tuple[str, ...] = ( + "- Use standard markdown links `[Display Text](URL)` for hyperlinks", + "- Do NOT use Slack-native `` link syntax", + "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", + "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", + "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", + "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", + "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", +) + + # ── Platform implementation ─────────────────────────────────────────── @@ -445,18 +459,7 @@ def system_prompt_identity_lines( return lines def system_prompt_rules(self, *, user_id: str | None = None) -> list[str]: - lines = [ - "", - "## Slack Formatting Rules", - "", - "- Use standard markdown links `[Display Text](URL)` for hyperlinks", - "- Do NOT use Slack-native `` link syntax", - "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", - "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", - "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", - "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", - "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", - ] + lines = ["", "## Slack Formatting Rules", "", *_SLACK_FORMATTING_RULES] if user_id: lines.append( f"- After completing a long task, tag the requester with their real Slack mention: <@{user_id}>" diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index 5a0510e9..1ba62a95 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -52,8 +52,12 @@ log = structlog.get_logger() +# Canonical key written by current slackbot deploys. _SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot_live_delivery" -_LEGACY_SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot" + "_v" + "2_live_delivery" +# Legacy spelling that pre-dated the rename to ``slackbot_live_delivery``. +# In-flight executions may still carry this value; we read both spellings +# but never write the legacy one. +_LEGACY_SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot_v2_live_delivery" EXECUTION_SILENCE_TIMEOUT_S = int(os.getenv("EXECUTION_SILENCE_TIMEOUT_S", "600")) EXECUTION_TOOL_SILENCE_TIMEOUT_S = int( @@ -916,6 +920,9 @@ async def get_live_session_id_for_thread(pool, thread_key: str) -> str | None: tool call into the live session without coupling themselves to the ``agent_execution_requests`` schema. """ + # Read both the canonical ``slackbot_live_delivery`` and the legacy + # ``slackbot_v2_live_delivery`` key so in-flight executions written + # before the rename are still routable. session_id = await pool.fetchval( "SELECT metadata->>'slackbot_agent_session_id' " "FROM agent_execution_requests " @@ -923,7 +930,7 @@ async def get_live_session_id_for_thread(pool, thread_key: str) -> str | None: "AND status = 'running' " "AND (" " metadata->>'slackbot_live_delivery' = 'true' " - " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" + " OR metadata->>'slackbot_v2_live_delivery' = 'true'" ") " "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", diff --git a/services/api/api/slack_sanitize.py b/services/api/api/slack_sanitize.py index 4fdf3864..58c92956 100644 --- a/services/api/api/slack_sanitize.py +++ b/services/api/api/slack_sanitize.py @@ -1,9 +1,20 @@ -"""Compatibility shim. The implementation moved to ``api.platforms.slack``; +"""Compatibility shim. The implementation lives in ``api.platforms.slack``; this module remains so existing imports of ``sanitize_for_slack`` keep -working.""" +working. New callers should import from ``api.platforms.slack`` +directly; this shim is scheduled for removal in a future cleanup PR. +""" from __future__ import annotations +import warnings + from api.platforms.slack import sanitize_for_slack # noqa: F401 +warnings.warn( + "api.slack_sanitize is a back-compat shim; " + "import sanitize_for_slack from api.platforms.slack instead.", + DeprecationWarning, + stacklevel=2, +) + __all__ = ["sanitize_for_slack"] diff --git a/services/api/api/slackbot_client.py b/services/api/api/slackbot_client.py index e53f7da8..9851f646 100644 --- a/services/api/api/slackbot_client.py +++ b/services/api/api/slackbot_client.py @@ -1,15 +1,16 @@ -"""Compatibility shim. The implementation moved to ``api.platforms.slack``; +"""Compatibility shim. The implementation lives in ``api.platforms.slack``; this module re-exports the module-level helpers and delegates the async session helpers to the registered ``SlackPlatform`` singleton so existing ``from api import slackbot_client`` callers keep working. -Phase 1 callers should resolve a platform via +New callers should resolve a platform via ``api.platforms.resolve_for_delivery(delivery)`` and call methods on it -directly. This shim will be removed once those migrations land. +directly. This shim is scheduled for removal in a future cleanup PR. """ from __future__ import annotations +import warnings from typing import Any from api.platforms.slack import ( @@ -24,6 +25,13 @@ thread_ts, ) +warnings.warn( + "api.slackbot_client is a back-compat shim; resolve a platform via " + "api.platforms.resolve_for_delivery and call methods on it directly.", + DeprecationWarning, + stacklevel=2, +) + __all__ = [ "channel_id", "enabled", diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index b4b34ecf..fc13e316 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -1362,6 +1362,9 @@ class _RegisteredHandler: source_path: str version: str schedule: dict[str, Any] | None = None # Optional SCHEDULE export + # Input keys to drop from the request-hash so they don't affect + # idempotency (e.g. ``history_messages`` for messaging_thread_turn). + hash_ignored_input_keys: tuple[str, ...] = () # Maps workflow_name → registered handler + optional input class @@ -1531,12 +1534,19 @@ async def _auto_handler( ): schedule.setdefault("delivery", dict(mod_delivery)) version = hashlib.sha256(py_file.read_bytes()).hexdigest() + raw_hash_ignored = getattr(mod, "HASH_IGNORED_INPUT_KEYS", ()) or () + if isinstance(raw_hash_ignored, str): + raw_hash_ignored = (raw_hash_ignored,) + hash_ignored_input_keys = tuple( + str(key) for key in raw_hash_ignored if isinstance(key, str) and key + ) registered = _RegisteredHandler( handler=wf_handler, input_cls=input_cls, source_path=str(py_file), version=version, schedule=schedule, + hash_ignored_input_keys=hash_ignored_input_keys, ) _WORKFLOW_HANDLERS[wf_name] = registered discovered[wf_name] = str(py_file) @@ -1845,11 +1855,13 @@ def _workflow_request_hash( workflow_name: str, run_input: dict[str, Any], ) -> str: hash_input = dict(run_input) - # messaging_thread_turn (and its legacy alias slack_thread_turn) drops - # history_messages from the request hash so backfill differences don't - # break idempotency. - if workflow_name in {"messaging_thread_turn", "slack_thread_turn"}: - hash_input.pop("history_messages", None) + # Workflows may declare ``HASH_IGNORED_INPUT_KEYS`` to drop selected + # input fields from the request hash (e.g. backfill payloads that + # shouldn't affect idempotency). + registered = _WORKFLOW_HANDLERS.get(workflow_name) + if registered is not None: + for key in registered.hash_ignored_input_keys: + hash_input.pop(key, None) return request_hash( {"workflow_name": workflow_name, "input": hash_input}, ) diff --git a/services/api/api/workflows/slack_thread_turn.py b/services/api/api/workflows/slack_thread_turn.py index cd22ea1d..a7341ec4 100644 --- a/services/api/api/workflows/slack_thread_turn.py +++ b/services/api/api/workflows/slack_thread_turn.py @@ -20,6 +20,9 @@ WORKFLOW_NAME = "messaging_thread_turn" WORKFLOW_ALIASES = ("slack_thread_turn",) +# Drop history_messages from the request hash so re-fires of the same +# user turn with different backfill history still hit the idempotency key. +HASH_IGNORED_INPUT_KEYS = ("history_messages",) _EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "pi-mono"}) _PROMPT_FLAG_ALIASES = { From d4a514d4e8230f0a644778d4818b74d56d968b1a Mon Sep 17 00:00:00 2001 From: Will Drach Date: Sat, 23 May 2026 08:24:39 -0600 Subject: [PATCH 5/5] Address ultrareview nits: alias collision, dead helpers, type hints, schedule platform MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four post-review polish items flagged by the remote review: - Workflow alias collision now logs ``workflow_alias_collision`` when a later loader's alias would clobber an existing canonical handler from a different module. Pure observability — no current in-tree collision, but the alias machinery introduced in the prior commits creates the surface area; load-order-dependent silent overwrites now show up in the logs instead of as ghost behaviour. - Schedule auto-derivation of ``delivery.platform`` from ``thread_key`` now treats 2-part legacy keys (``CHANNEL:TS``) as slack and only reads a platform namespace from 3-part keys (``platform:channel:ts``). Restores the back-compat the inline comment already promised but the previous ``":" in thread_key`` shortcut bypassed for the 2-part shape that ``_split_thread_key`` still accepts. - Delete dead ``_clip_slackbot`` helper and ``_MAX_SLACKBOT_TEXT_CHARS`` / ``_MAX_SLACKBOT_STEP_CHARS`` constants in runtime_control.py. Their only consumer (``_send_slackbot_canonical_event``) was removed earlier in the branch; clipping responsibility moved to ``MessagingPlatform.clip_text``. - Fix stale ``dict[str, str | bool] | None`` annotations on ``_resolve_requester_identity`` and ``_build_session_context``'s ``requester_identity`` parameter. The dataclass migration earlier in the branch made the runtime type ``RequesterIdentity | None``; annotations now match. Verified: uv run ruff check api/ clean; 82/82 targeted tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- services/api/api/agent.py | 11 ++++++----- services/api/api/runtime_control.py | 12 ------------ services/api/api/workflow_engine.py | 18 +++++++++++++----- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/services/api/api/agent.py b/services/api/api/agent.py index 7dc40fa3..29e1b69a 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -34,20 +34,21 @@ messages_to_content_blocks, ) from api.deps import mint_sandbox_token +from api.platforms import RequesterIdentity from api.sandbox.normalize import normalize_harness_event from api.sandbox.registry import get_backend from api.trace_context import get_or_create_thread_trace_id -log = structlog.get_logger() - # Slack profile → GitHub handle extraction lives on the SlackPlatform; we # keep the legacy module-level name as a re-export so the existing test # (`from api.agent import _extract_github_handle_from_slack_profile`) and # any other call sites keep working until they migrate. -from api.platforms.slack import ( # noqa: E402, F401 +from api.platforms.slack import ( # noqa: F401 extract_github_handle_from_slack_profile as _extract_github_handle_from_slack_profile, ) +log = structlog.get_logger() + _VALID_STDOUT_EVENT_TYPES = frozenset( { "amp_raw_event", @@ -562,7 +563,7 @@ async def _resolve_requester_identity( *, platform: str | None, user_id: str | None, -) -> dict[str, str | bool] | None: +) -> RequesterIdentity | None: """Delegate to the registered messaging platform. Slack pulls GitHub handles from profile custom fields; other platforms @@ -816,7 +817,7 @@ def _build_session_context( *, platform: str | None = None, user_id: str | None = None, - requester_identity: dict[str, str | bool] | None = None, + requester_identity: RequesterIdentity | None = None, ) -> str: """Build session context to append to the system prompt. diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index 1ba62a95..10bfcd7d 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -88,8 +88,6 @@ int(os.getenv("EXECUTION_RESERVED_USER_SLOTS", "16")), 0, ) -_MAX_SLACKBOT_TEXT_CHARS = 12_000 -_MAX_SLACKBOT_STEP_CHARS = 12_000 _MAX_WORKFLOW_EXECUTION_SLOTS = max( EXECUTION_WORKER_CONCURRENCY - EXECUTION_RESERVED_USER_SLOTS, 1, @@ -966,16 +964,6 @@ def build_execution_state_payload( return payload -def _clip_slackbot(value: Any, max_chars: int = _MAX_SLACKBOT_STEP_CHARS) -> str: - text = ( - value - if isinstance(value, str) - else json.dumps(value, ensure_ascii=False, default=str) - ) - text = text.strip() - return text if len(text) <= max_chars else f"{text[: max_chars - 1]}…" - - def _has_slackbot_live_delivery(metadata: dict[str, Any]) -> bool: return ( metadata.get(_SLACKBOT_LIVE_DELIVERY_METADATA_KEY) is True diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index fc13e316..529adc0c 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -1558,6 +1558,14 @@ async def _auto_handler( for alias in aliases: if not isinstance(alias, str) or not alias.strip() or alias == wf_name: continue + existing = _WORKFLOW_HANDLERS.get(alias) + if existing is not None and existing is not registered: + log.warning( + "workflow_alias_collision", + alias=alias, + new_source=str(py_file), + existing_source=existing.source_path, + ) _WORKFLOW_HANDLERS[alias] = registered discovered[alias] = str(py_file) except Exception: @@ -1729,11 +1737,11 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: if "delivery" not in input_json: try: channel, thread_ts = _split_thread_key(thread_key) - derived_platform = ( - thread_key.split(":", 1)[0] - if ":" in thread_key - else "slack" - ) + # 3-part ``platform:channel:ts`` keys carry their own + # platform namespace; 2-part ``channel:ts`` legacy keys + # predate the namespace and are always slack. + parts = thread_key.split(":") + derived_platform = parts[0] if len(parts) >= 3 else "slack" input_json["delivery"] = { "channel": channel, "thread_ts": thread_ts,