diff --git a/services/api/api/agent.py b/services/api/api/agent.py index 356737c7..29e1b69a 100644 --- a/services/api/api/agent.py +++ b/services/api/api/agent.py @@ -18,7 +18,6 @@ import contextlib import json import os -import re import time import uuid from collections.abc import AsyncIterator @@ -35,23 +34,21 @@ messages_to_content_blocks, ) from api.deps import mint_sandbox_token +from api.platforms import RequesterIdentity from api.sandbox.normalize import normalize_harness_event from api.sandbox.registry import get_backend from api.trace_context import get_or_create_thread_trace_id -log = structlog.get_logger() - -_GITHUB_HANDLE_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?$") -_GITHUB_URL_RE = re.compile( - r"(?:https?://)?github\.com/([A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?)", - re.IGNORECASE, -) -_GITHUB_LABEL_RE = re.compile(r"\bgithub\b", re.IGNORECASE) -_GITHUB_PREFIX_RE = re.compile( - r"\bgithub\b\s*(?:username|user|handle|profile)?\s*[:/@-]?\s*@?([A-Za-z0-9][A-Za-z0-9-]{0,38})", - re.IGNORECASE, +# Slack profile → GitHub handle extraction lives on the SlackPlatform; we +# keep the legacy module-level name as a re-export so the existing test +# (`from api.agent import _extract_github_handle_from_slack_profile`) and +# any other call sites keep working until they migrate. +from api.platforms.slack import ( # noqa: F401 + extract_github_handle_from_slack_profile as _extract_github_handle_from_slack_profile, ) +log = structlog.get_logger() + _VALID_STDOUT_EVENT_TYPES = frozenset( { "amp_raw_event", @@ -562,130 +559,21 @@ async def _get_latest_thread_user_id(thread_key: str) -> str | None: return str(user_id).strip() or None -def _valid_github_handle(value: str) -> str | None: - candidate = value.strip().strip("@").strip() - candidate = candidate.rstrip("/").split("/", 1)[0] - return candidate if _GITHUB_HANDLE_RE.match(candidate) else None - - -def _extract_github_handle_from_slack_profile( - profile: dict[str, Any], -) -> tuple[str | None, str | None, str]: - """Return (handle, source, unavailable_reason) from Slack profile fields.""" - custom_fields = profile.get("custom_fields") - if not isinstance(custom_fields, dict) or not custom_fields: - return None, None, "no GitHub custom field found on Slack profile" - - saw_github_field = False - for label, raw_value in custom_fields.items(): - label_text = str(label or "").strip() - value = str(raw_value or "").strip() - if not value: - continue - - label_mentions_github = bool(_GITHUB_LABEL_RE.search(label_text)) - value_mentions_github = bool(_GITHUB_LABEL_RE.search(value)) - if not label_mentions_github and not value_mentions_github: - continue - saw_github_field = True - - source = ( - f'Slack profile custom field "{label_text}"' - if label_text - else "Slack profile custom field" - ) - url_match = _GITHUB_URL_RE.search(value) - if url_match: - handle = _valid_github_handle(url_match.group(1)) - if handle: - return f"@{handle}", source, "" - - prefixed_match = _GITHUB_PREFIX_RE.search(value) - if prefixed_match: - handle = _valid_github_handle(prefixed_match.group(1)) - if handle: - return f"@{handle}", source, "" - - if label_mentions_github: - handle = _valid_github_handle(value) - if handle: - return f"@{handle}", source, "" - - if saw_github_field: - return ( - None, - None, - "GitHub profile field did not contain a valid GitHub handle", - ) - return None, None, "no GitHub custom field found on Slack profile" - - async def _resolve_requester_identity( *, platform: str | None, user_id: str | None, -) -> dict[str, str | bool] | None: - if not user_id or (platform or "").lower() != "slack": - return None - - identity: dict[str, str | bool] = { - "slack_user_id": user_id, - "slack_mention": f"<@{user_id}>", - } - try: - from api.app import get_tool_manager - - profile = await get_tool_manager().call_tool_raw( - "slack", "get_user_profile", {"user_id": user_id} - ) - except Exception as exc: - log.warning( - "requester_identity_lookup_failed", - platform=platform, - user_id=user_id, - error=str(exc), - ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } - ) - return identity +) -> RequesterIdentity | None: + """Delegate to the registered messaging platform. - if not isinstance(profile, dict) or profile.get("error"): - error = str(profile.get("error") or "Slack profile could not be fetched") - log.warning( - "requester_identity_lookup_failed", - platform=platform, - user_id=user_id, - error=error, - ) - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": "Slack profile could not be fetched", - } - ) - return identity + Slack pulls GitHub handles from profile custom fields; other platforms + return ``None`` until their adapter implements identity recovery. + """ + from api.platforms import resolve_platform - handle, source, reason = _extract_github_handle_from_slack_profile(profile) - if handle: - identity.update( - { - "github_handle": handle, - "github_handle_source": source or "Slack profile custom field", - "github_handle_verified": True, - } - ) - else: - identity.update( - { - "github_handle_verified": False, - "github_handle_unavailable_reason": reason, - } - ) - return identity + if not user_id: + return None + return await resolve_platform(platform).load_requester_identity(user_id) async def _insert_system_message( @@ -929,15 +817,17 @@ def _build_session_context( *, platform: str | None = None, user_id: str | None = None, - requester_identity: dict[str, str | bool] | None = None, + requester_identity: RequesterIdentity | None = None, ) -> str: """Build session context to append to the system prompt. - Contains metadata (time, thread, platform) and platform-specific formatting - rules so the agent produces output suitable for the target platform. + Contains metadata (time, thread, platform) plus identity + formatting + rules contributed by the registered messaging platform. """ from datetime import datetime, timezone + from api.platforms import resolve_platform + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") lines = [ "# Session Context", @@ -948,55 +838,9 @@ def _build_session_context( if platform: lines.append(f"- **Platform**: {platform}") - if requester_identity: - lines.extend( - [ - "", - "## Requester Identity", - "", - f"- Slack user ID: {requester_identity['slack_user_id']}", - f"- Slack mention: {requester_identity['slack_mention']}", - ] - ) - if requester_identity.get("github_handle_verified"): - lines.extend( - [ - "- GitHub handle from Slack profile: " - f"{requester_identity['github_handle']}", - "- GitHub handle source: " - f"{requester_identity['github_handle_source']}", - "- GitHub handle verified: yes", - ] - ) - else: - lines.extend( - [ - "- GitHub handle from Slack profile: unavailable", - "- GitHub handle unavailable reason: " - f"{requester_identity['github_handle_unavailable_reason']}", - "- GitHub handle verified: no", - ] - ) - - if platform and platform.lower() == "slack": - lines.extend( - [ - "", - "## Slack Formatting Rules", - "", - "- Use standard markdown links `[Display Text](URL)` for hyperlinks", - "- Do NOT use Slack-native `` link syntax", - "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", - "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", - "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", - "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", - "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", - ] - ) - if user_id: - lines.append( - f"- After completing a long task, tag the requester with their real Slack mention: <@{user_id}>" - ) + platform_impl = resolve_platform(platform) + lines.extend(platform_impl.system_prompt_identity_lines(requester_identity)) + lines.extend(platform_impl.system_prompt_rules(user_id=user_id)) lines.extend(["", "---", ""]) return "\n".join(lines) diff --git a/services/api/api/platforms/__init__.py b/services/api/api/platforms/__init__.py new file mode 100644 index 00000000..b758f163 --- /dev/null +++ b/services/api/api/platforms/__init__.py @@ -0,0 +1,296 @@ +"""Messaging-platform abstraction layer. + +A ``MessagingPlatform`` owns everything that varies between the platforms +Centaur can deliver agent results to: text scrubbing, identity recovery, +system-prompt injection, live-streaming session lifecycle, thread-key +parsing, and active-thread tool-call capture. + +One singleton per platform is registered in ``PLATFORMS``. Callers resolve +a platform by name (``resolve_platform("slack")``) or by inspecting the +delivery dict on an execution (``resolve_for_delivery(delivery)``). The +default fallback when the platform is missing or unknown is ``"dev"``, +which is a no-op implementation suitable for unit tests and the localhost +bypass path. + +Built-in platforms are wired up by ``register_builtin_platforms()`` at the +bottom of this module, so simple ``from api.platforms import +resolve_platform`` works without explicit app-startup setup. + +Platform-agnostic execution metadata keys +----------------------------------------- +The execution worker reads and writes several JSONB metadata keys on +``agent_execution_requests`` that are conceptually platform-agnostic but +historically Slack-named. Any platform that implements live streaming +must honor these spellings so the rest of the system stays platform-blind: + +- ``slackbot_live_delivery`` (bool) — gate for live-session forwarding +- ``slackbot_agent_session_id`` (str) — opaque per-execution session id +- ``slackbot_live_delivery_failed`` (str) — reason live forwarding bailed +- ``slackbot_streamed_answer_chars`` (int) — chars already streamed live + +A future schema-cleanup PR will rename these to ``live_session_*``; until +then platforms write under the legacy spellings. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any, NamedTuple + +import structlog + +log = structlog.get_logger() + + +class ActiveThreadCapture(NamedTuple): + """Result of matching a tool call for re-routing into the live session. + + ``text`` is what the platform wants forwarded into the live session as + the agent's reply. ``envelope`` is the dict returned to the agent in + place of the normal tool result so it knows the call was intercepted. + """ + + text: str + envelope: dict[str, Any] + + +@dataclass(frozen=True) +class RequesterIdentity: + """Who triggered an agent turn, as recovered from the messaging platform. + + Rendered into the system prompt so the agent can address the requester + by handle, mention them in replies, and look up their GitHub identity. + Construction is platform-agnostic; per-platform rendering of e.g. + ``"Slack user ID: ..."`` lives on the platform's + ``system_prompt_identity_lines`` method. + """ + + user_id: str + mention: str + github_handle: str | None = None + github_handle_source: str | None = None + github_handle_verified: bool = False + github_handle_unavailable_reason: str | None = None + + +class MessagingPlatform: + """Base implementation. Subclasses override what the platform supports. + + Every method has a working default so unrelated platforms (and tests + that pass arbitrary platform names) keep working without raising. + """ + + name: str = "" + + # Text truncation limits used by the live-streaming projection. + message_chunk_chars: int = 12_000 + step_chunk_chars: int = 12_000 + + # Regex matching a leading ``<@USER_ID>`` mention prefix and capturing + # the remaining text. Used by recovery-command normalization in the + # ``messaging_thread_turn`` workflow. ``None`` means the platform has + # no mention-prefix grammar; callers should fall through. + mention_prefix_re: Any = None + + # ── Delivery predicates ──────────────────────────────────────────── + + def is_delivery_for_me(self, delivery: dict[str, Any] | None) -> bool: + return ( + isinstance(delivery, dict) + and str(delivery.get("platform") or "") == self.name + ) + + # ── Text scrubbing ───────────────────────────────────────────────── + + def sanitize_text( + self, text: str | None, *, preserve_edges: bool = False + ) -> str: + return text or "" + + def sanitize_event(self, value: Any) -> Any: + return value + + def clip_text(self, value: Any, max_chars: int | None = None) -> str: + text = ( + value + if isinstance(value, str) + else json.dumps(value, ensure_ascii=False, default=str) + ) + text = text.strip() + limit = max_chars or self.step_chunk_chars + return text if len(text) <= limit else f"{text[: limit - 1]}…" + + # ── Thread-key parsing ───────────────────────────────────────────── + + def thread_key_destination(self, thread_key: str) -> str | None: + """Return the channel/destination ID encoded in the thread_key, or None.""" + return None + + # ── Identity recovery ────────────────────────────────────────────── + + async def load_requester_identity( + self, user_id: str | None + ) -> RequesterIdentity | None: + return None + + def system_prompt_identity_lines( + self, identity: RequesterIdentity | None + ) -> list[str]: + return [] + + def system_prompt_rules(self, *, user_id: str | None = None) -> list[str]: + return [] + + # ── Live-streaming session lifecycle ─────────────────────────────── + + async def open_live_session( + self, + *, + delivery: dict[str, Any], + metadata: dict[str, Any], + thread_key: str, + title: str = "Centaur execution", + header: str | None = None, + ) -> str | None: + return None + + async def session_text( + self, session_id: str | None, markdown: str + ) -> None: + return None + + async def session_step( + self, + session_id: str | None, + *, + step_id: str, + title: str, + status: str = "in_progress", + details: str | None = None, + output: str | None = None, + ) -> None: + return None + + async def session_done( + self, session_id: str | None, thread_id: str | None = None + ) -> None: + return None + + async def harness_event( + self, session_id: str | None, event: dict[str, Any] + ) -> dict[str, Any] | None: + return None + + async def assistant_status( + self, delivery: dict[str, Any], status: str + ) -> None: + return None + + # ── Channel post (workflow ctx.post_to_channel) ──────────────────── + + async def send_channel_message( + self, + channel: str, + text: str, + *, + thread_id: str | None = None, + ) -> dict[str, Any]: + """Post a message to a channel via this platform's outbound tool. + + Default raises so workflows fail loudly when they target a platform + whose adapter doesn't implement outbound channel posts. Subclasses + override to call the right tool (Slack → ``slack.send_message``, + Discord → ``discord.send_message``, …). + """ + raise NotImplementedError( + f"platform {self.name!r} does not implement send_channel_message" + ) + + # ── Live tool-call capture ───────────────────────────────────────── + + def match_active_thread_capture( + self, + *, + thread_key: str, + tool_name: str, + method_name: str, + args: dict[str, Any], + ) -> ActiveThreadCapture | None: + """Pure-function check: should this tool call be re-routed through + the live session instead of hitting the platform's API? + + Returns the text to forward + the response envelope on capture, + ``None`` when the call should pass through to the normal tool + path. The platform performs no I/O; ``tool_manager`` owns the + live-session lookup (via ``runtime_control.get_live_session_id_for_thread``) + and the actual ``session_text`` forward. + + Slack catches ``slack.send_message`` to the active thread; other + platforms return ``None`` by default. + """ + return None + + +# ── Registry ─────────────────────────────────────────────────────────── + +PLATFORMS: dict[str, MessagingPlatform] = {} + + +def register_platform(platform: MessagingPlatform) -> None: + if not platform.name: + raise ValueError("platform.name must be set before registration") + PLATFORMS[platform.name] = platform + + +def resolve_platform(name: str | None) -> MessagingPlatform: + """Return the registered platform, falling back to ``dev`` (no-op). + + Unknown platform names log a debug message and return the dev platform + so callers don't need to special-case ``None``/unknown values. + """ + if name and name in PLATFORMS: + return PLATFORMS[name] + if name: + log.debug("resolve_platform_unknown", requested=name) + return PLATFORMS["dev"] + + +def resolve_for_delivery( + delivery: dict[str, Any] | None, +) -> MessagingPlatform: + name: str | None = None + if isinstance(delivery, dict): + value = delivery.get("platform") + if isinstance(value, str) and value: + name = value + return resolve_platform(name) + + +def resolve_for_thread_key(thread_key: str | None) -> MessagingPlatform | None: + """Resolve a platform from the leading namespace of a thread_key + (e.g. ``"slack:C123:..."`` → SlackPlatform). Returns None when the + prefix doesn't match any registered platform. + """ + if not thread_key or ":" not in thread_key: + return None + prefix = thread_key.split(":", 1)[0] + return PLATFORMS.get(prefix) + + +def register_builtin_platforms() -> None: + """Wire up the platforms that ship with the API. + + Called at module import time (below) so simple ``from api.platforms + import resolve_platform`` works without explicit app-startup setup. + Tests that need a clean registry can call ``PLATFORMS.clear()`` and + re-invoke this. + """ + from api.platforms.dev import DEV_PLATFORM + from api.platforms.slack import SLACK_PLATFORM + + register_platform(DEV_PLATFORM) + register_platform(SLACK_PLATFORM) + + +register_builtin_platforms() diff --git a/services/api/api/platforms/dev.py b/services/api/api/platforms/dev.py new file mode 100644 index 00000000..7f21da36 --- /dev/null +++ b/services/api/api/platforms/dev.py @@ -0,0 +1,18 @@ +"""Dev platform: no-op everything. Used for localhost-bypass executions +and unit tests where no real messaging integration is wired up. + +Registration happens centrally via +``api.platforms.register_builtin_platforms``; importing this module +does not register on its own. +""" + +from __future__ import annotations + +from api.platforms import MessagingPlatform + + +class DevPlatform(MessagingPlatform): + name = "dev" + + +DEV_PLATFORM = DevPlatform() diff --git a/services/api/api/platforms/slack.py b/services/api/api/platforms/slack.py new file mode 100644 index 00000000..5c5c1f19 --- /dev/null +++ b/services/api/api/platforms/slack.py @@ -0,0 +1,644 @@ +"""Slack messaging platform: text scrubbing, identity recovery, +system-prompt injection, live-streaming session, and active-thread +``slack.send_message`` capture. + +Everything Slack-specific lives here. Thin shims at +``api/slack_sanitize.py`` and ``api/slackbot_client.py`` re-export the +public surface for back-compat with older callers. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import re +from collections.abc import Callable +from dataclasses import replace +from typing import Any + +import httpx +import structlog + +from api.platforms import ActiveThreadCapture, MessagingPlatform, RequesterIdentity + +log = structlog.get_logger() + +# ── Text scrubbing ──────────────────────────────────────────────────── + +_THREAD_TRAILER_RE = re.compile( + r"(?:^|\s)(?:Agent|Codex|Amp|Claude\s+Code|Pi)\s+thread\s+`?[0-9a-f-]{8,}`?(?:,\s*with\s+interactive\s+elements)?(?=\s*$|[.!?]\s*$)", + re.IGNORECASE | re.MULTILINE, +) +_EXECUTION_TRAILER_RE = re.compile( + r"\b(?:Execution|execution_id)\s*[:=]\s*`?exe_[0-9a-f]{16}`?", + re.IGNORECASE, +) +_CURL_EXIT_RE = re.compile(r"curl:?\s*\((\d+)\):?\s*[^\n]{0,200}", re.IGNORECASE) + +_TEXT_KEYS = { + "content", + "delta", + "details", + "error", + "message", + "output", + "result", + "summary", + "text", + "title", +} + + +def _replace_matching_json_objects( + text: str, + predicate: Callable[[dict[str, Any]], bool], + replacement: str, +) -> str: + decoder = json.JSONDecoder() + out: list[str] = [] + index = 0 + while index < len(text): + if text[index] != "{": + out.append(text[index]) + index += 1 + continue + try: + value, end = decoder.raw_decode(text[index:]) + except ValueError: + out.append(text[index]) + index += 1 + continue + if isinstance(value, dict) and predicate(value): + out.append(replacement) + index += end + continue + out.append(text[index]) + index += 1 + return "".join(out) + + +def _is_k8s_status(value: dict[str, Any]) -> bool: + return value.get("kind") == "Status" and any( + key in value for key in ("status", "reason", "code", "message") + ) + + +def _is_tool_error_envelope(value: dict[str, Any]) -> bool: + return "error_type" in value and any( + key in value for key in ("detail", "error", "status_code") + ) + + +def sanitize_for_slack(text: str | None, *, preserve_edges: bool = False) -> str: + """Strip known plumbing leaks from ``text``. Idempotent; empty input → ``""``.""" + if not text: + return "" + sanitized = _replace_matching_json_objects( + text, _is_k8s_status, "[k8s status omitted]" + ) + sanitized = _replace_matching_json_objects( + sanitized, _is_tool_error_envelope, "[tool error omitted]" + ) + sanitized = _THREAD_TRAILER_RE.sub("", sanitized) + sanitized = _EXECUTION_TRAILER_RE.sub("[execution id omitted]", sanitized) + sanitized = _CURL_EXIT_RE.sub(r"transport_error(\1)", sanitized) + sanitized = re.sub(r"[ \t]+\n", "\n", sanitized) + sanitized = re.sub(r"\n{3,}", "\n\n", sanitized) + return sanitized if preserve_edges else sanitized.strip() + + +def sanitize_slack_event(value: Any) -> Any: + if isinstance(value, str): + return sanitize_for_slack(value, preserve_edges=True) + if isinstance(value, list): + return [sanitize_slack_event(item) for item in value] + if isinstance(value, dict): + sanitized: dict[str, Any] = {} + for key, item in value.items(): + if isinstance(item, (dict, list)) or key in _TEXT_KEYS: + sanitized[key] = sanitize_slack_event(item) + else: + sanitized[key] = item + return sanitized + return value + + +# ── Slackbot HTTP client ────────────────────────────────────────────── + +_RETRYABLE_STATUS = frozenset({408, 429, 500, 502, 503, 504}) +_RETRY_ATTEMPTS = 3 +_RETRY_BASE_DELAY_S = 0.25 + + +def _base_url() -> str: + return os.getenv("SLACKBOT_URL", "").strip().rstrip("/") + + +def _api_key() -> str: + return os.getenv("SLACKBOT_API_KEY", "").strip() + + +def enabled() -> bool: + return bool(_base_url() and _api_key()) + + +async def slackbot_post( + path: str, + body: dict[str, Any], + *, + timeout: httpx.Timeout | None = None, +) -> dict[str, Any] | None: + base_url = _base_url() + api_key = _api_key() + if not base_url or not api_key: + return None + request_timeout = timeout or httpx.Timeout(8.0, connect=2.0) + last_status: int | None = None + last_response: str | None = None + last_error: str | None = None + for attempt in range(_RETRY_ATTEMPTS): + try: + async with httpx.AsyncClient(timeout=request_timeout) as client: + response = await client.post( + f"{base_url}{path}", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=body, + ) + text = response.text + if response.is_success: + if not text: + return {} + data = response.json() + return data if isinstance(data, dict) else {} + last_status = response.status_code + last_response = text[:500] + if response.status_code not in _RETRYABLE_STATUS: + log.warning( + "slackbot_call_failed", + path=path, + status=response.status_code, + response=last_response, + ) + return None + except Exception as exc: + last_error = str(exc) + if attempt + 1 < _RETRY_ATTEMPTS: + await asyncio.sleep(_RETRY_BASE_DELAY_S * (2**attempt)) + log.warning( + "slackbot_call_failed", + path=path, + status=last_status, + response=last_response, + error=last_error, + attempts=_RETRY_ATTEMPTS, + ) + return None + + +# ── Delivery field extraction (Slack-shaped) ────────────────────────── + + +def is_slack_delivery(delivery: dict[str, Any] | None) -> bool: + return isinstance(delivery, dict) and str(delivery.get("platform") or "") == "slack" + + +def channel_id(delivery: dict[str, Any]) -> str: + return str(delivery.get("channel") or delivery.get("channel_id") or "").strip() + + +def thread_ts(delivery: dict[str, Any]) -> str: + return str(delivery.get("thread_ts") or "").strip() + + +def recipient_team_id(delivery: dict[str, Any], thread_key: str) -> str: + value = str( + delivery.get("recipient_team_id") + or delivery.get("team_id") + or delivery.get("team") + or "" + ).strip() + if value: + return value + parts = thread_key.split(":") + return parts[1] if len(parts) >= 2 and parts[0] == "slack" else "" + + +def recipient_user_id(delivery: dict[str, Any], metadata: dict[str, Any]) -> str: + return str( + delivery.get("recipient_user_id") + or delivery.get("user_id") + or metadata.get("user_id") + or "" + ).strip() + + +def slack_thread_key_to_channel(thread_key: str) -> str: + """Extract the channel ID from a ``slack:CHANNEL:TS`` thread_key. + + Returns ``""`` if the thread_key does not have the slack shape. + """ + parts = thread_key.split(":") + if len(parts) >= 3 and parts[0] == "slack": + # slack:TEAM:CHANNEL:TS (4-part) or slack:CHANNEL:TS (3-part) + return parts[2] if len(parts) >= 4 else parts[1] + return "" + + +# ── GitHub identity extraction from Slack profiles ──────────────────── + +_GITHUB_HANDLE_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?$") +_GITHUB_URL_RE = re.compile( + r"(?:https?://)?github\.com/([A-Za-z0-9](?:[A-Za-z0-9-]{0,37}[A-Za-z0-9])?)", + re.IGNORECASE, +) +_GITHUB_LABEL_RE = re.compile(r"\bgithub\b", re.IGNORECASE) +_GITHUB_PREFIX_RE = re.compile( + r"\bgithub\b\s*(?:username|user|handle|profile)?\s*[:/@-]?\s*@?([A-Za-z0-9][A-Za-z0-9-]{0,38})", + re.IGNORECASE, +) + + +def _valid_github_handle(value: str) -> str | None: + # Accept inputs like " @octocat ", "@octocat/repo", or "octocat/" by + # stripping surrounding whitespace + leading ``@`` and lopping off + # any trailing path segment before pattern matching. + candidate = value.strip("@ \t\n").rstrip("/").split("/", 1)[0] + return candidate if _GITHUB_HANDLE_RE.match(candidate) else None + + +def extract_github_handle_from_slack_profile( + profile: dict[str, Any], +) -> tuple[str | None, str | None, str]: + """Return ``(handle, source, unavailable_reason)`` from Slack profile fields.""" + custom_fields = profile.get("custom_fields") + if not isinstance(custom_fields, dict) or not custom_fields: + return None, None, "no GitHub custom field found on Slack profile" + + saw_github_field = False + for label, raw_value in custom_fields.items(): + label_text = str(label or "").strip() + value = str(raw_value or "").strip() + if not value: + continue + + label_mentions_github = bool(_GITHUB_LABEL_RE.search(label_text)) + value_mentions_github = bool(_GITHUB_LABEL_RE.search(value)) + if not label_mentions_github and not value_mentions_github: + continue + saw_github_field = True + + source = ( + f'Slack profile custom field "{label_text}"' + if label_text + else "Slack profile custom field" + ) + url_match = _GITHUB_URL_RE.search(value) + if url_match: + handle = _valid_github_handle(url_match.group(1)) + if handle: + return f"@{handle}", source, "" + + prefixed_match = _GITHUB_PREFIX_RE.search(value) + if prefixed_match: + handle = _valid_github_handle(prefixed_match.group(1)) + if handle: + return f"@{handle}", source, "" + + if label_mentions_github: + handle = _valid_github_handle(value) + if handle: + return f"@{handle}", source, "" + + if saw_github_field: + return ( + None, + None, + "GitHub profile field did not contain a valid GitHub handle", + ) + return None, None, "no GitHub custom field found on Slack profile" + + +# ── Mention prefix used by recovery-command normalization ───────────── + +_SLACK_ID_MENTION_RE = re.compile( + r"^<@[WU][A-Z0-9]+>\s*[:,;-]?\s*(.*)$", re.IGNORECASE +) + + +# ── Slack channel-ID shape, used inside match_active_thread_capture ─── + +_SLACK_CHANNEL_ID_RE = re.compile(r"^[CDG][A-Z0-9]+$") + + +# ── Slack-flavoured system-prompt formatting rules ──────────────────── + +_SLACK_FORMATTING_RULES: tuple[str, ...] = ( + "- Use standard markdown links `[Display Text](URL)` for hyperlinks", + "- Do NOT use Slack-native `` link syntax", + "- Preserve Slack user mentions (`<@UXXXXXXX>`) exactly as-is — only use these for actual Slack users", + "- For Twitter/X handles, link to the profile WITHOUT an @ prefix in the display text: `[handle](https://x.com/handle)` (NOT `[@handle](...)`)", + "- Prefer concise, well-structured markdown; long replies may be split across multiple Slack messages", + "- Markdown tables are allowed and may render as native Slack tables when the structure is clean", + "- NEVER put links/URLs inside code blocks (``` ```) — they won't be clickable. Use markdown tables or plain text with `[text](url)` links instead", +) + + +# ── Platform implementation ─────────────────────────────────────────── + + +def _identity_with_unavailable_reason( + base: RequesterIdentity, reason: str +) -> RequesterIdentity: + return replace( + base, + github_handle_verified=False, + github_handle_unavailable_reason=reason, + ) + + +class SlackPlatform(MessagingPlatform): + name = "slack" + mention_prefix_re = _SLACK_ID_MENTION_RE + + # ── Scrubbing ────────────────────────────────────────────────── + + def sanitize_text( + self, text: str | None, *, preserve_edges: bool = False + ) -> str: + return sanitize_for_slack(text, preserve_edges=preserve_edges) + + def sanitize_event(self, value: Any) -> Any: + return sanitize_slack_event(value) + + # ── Thread-key parsing ───────────────────────────────────────── + + def thread_key_destination(self, thread_key: str) -> str | None: + result = slack_thread_key_to_channel(thread_key) + return result or None + + # ── Identity recovery ────────────────────────────────────────── + + async def load_requester_identity( + self, user_id: str | None + ) -> RequesterIdentity | None: + if not user_id: + return None + base = RequesterIdentity(user_id=user_id, mention=f"<@{user_id}>") + try: + from api.app import get_tool_manager + + profile = await get_tool_manager().call_tool_raw( + "slack", "get_user_profile", {"user_id": user_id} + ) + except Exception as exc: + log.warning( + "requester_identity_lookup_failed", + platform=self.name, + user_id=user_id, + error=str(exc), + ) + return _identity_with_unavailable_reason( + base, "Slack profile could not be fetched" + ) + + if not isinstance(profile, dict) or profile.get("error"): + error = str(profile.get("error") or "Slack profile could not be fetched") + log.warning( + "requester_identity_lookup_failed", + platform=self.name, + user_id=user_id, + error=error, + ) + return _identity_with_unavailable_reason( + base, "Slack profile could not be fetched" + ) + + handle, source, reason = extract_github_handle_from_slack_profile(profile) + if handle: + return replace( + base, + github_handle=handle, + github_handle_source=source or "Slack profile custom field", + github_handle_verified=True, + ) + return _identity_with_unavailable_reason(base, reason) + + def system_prompt_identity_lines( + self, identity: RequesterIdentity | None + ) -> list[str]: + if identity is None: + return [] + lines = [ + "", + "## Requester Identity", + "", + f"- Slack user ID: {identity.user_id}", + f"- Slack mention: {identity.mention}", + ] + if identity.github_handle_verified: + lines.extend( + [ + f"- GitHub handle from Slack profile: {identity.github_handle}", + f"- GitHub handle source: {identity.github_handle_source}", + "- GitHub handle verified: yes", + ] + ) + else: + lines.extend( + [ + "- GitHub handle from Slack profile: unavailable", + "- GitHub handle unavailable reason: " + f"{identity.github_handle_unavailable_reason}", + "- GitHub handle verified: no", + ] + ) + return lines + + def system_prompt_rules(self, *, user_id: str | None = None) -> list[str]: + lines = ["", "## Slack Formatting Rules", "", *_SLACK_FORMATTING_RULES] + if user_id: + lines.append( + f"- After completing a long task, tag the requester with their real Slack mention: <@{user_id}>" + ) + return lines + + # ── Live-streaming session lifecycle ─────────────────────────── + + async def open_live_session( + self, + *, + delivery: dict[str, Any], + metadata: dict[str, Any], + thread_key: str, + title: str = "Centaur execution", + header: str | None = None, + ) -> str | None: + if not enabled() or not is_slack_delivery(delivery): + return None + channel = channel_id(delivery) + parent_ts = thread_ts(delivery) + if not channel or not parent_ts: + return None + body: dict[str, Any] = { + "channel": channel, + "parent_ts": parent_ts, + "recipient_team_id": recipient_team_id(delivery, thread_key), + "recipient_user_id": recipient_user_id(delivery, metadata), + "title": title, + } + header_text = (header or "").strip() + if header_text: + body["header"] = header_text + result = await slackbot_post("/api/slack/agent-sessions", body) + session_id = str((result or {}).get("session_id") or "").strip() + return session_id or None + + async def session_text( + self, session_id: str | None, markdown: str + ) -> None: + sanitized = sanitize_for_slack(markdown) + if not session_id or not sanitized.strip(): + return + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/text", + {"markdown": sanitized}, + ) + + async def session_step( + self, + session_id: str | None, + *, + step_id: str, + title: str, + status: str = "in_progress", + details: str | None = None, + output: str | None = None, + ) -> None: + if not session_id or not step_id or not title: + return + body: dict[str, Any] = { + "id": step_id, + "title": sanitize_for_slack(title), + "status": status, + } + if details: + body["details"] = sanitize_for_slack(details) + if output: + body["output"] = sanitize_for_slack(output) + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/step", body + ) + + async def session_done( + self, session_id: str | None, thread_id: str | None = None + ) -> None: + if not session_id: + return + body: dict[str, Any] = {} + if thread_id: + body["thread_id"] = thread_id + await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/done", body + ) + + async def harness_event( + self, session_id: str | None, event: dict[str, Any] + ) -> dict[str, Any] | None: + if not session_id: + return None + return await slackbot_post( + f"/api/slack/agent-sessions/{session_id}/harness-event", + {"event": sanitize_slack_event(event)}, + timeout=httpx.Timeout(60.0, connect=2.0), + ) + + async def assistant_status( + self, delivery: dict[str, Any], status: str + ) -> None: + if not enabled() or not is_slack_delivery(delivery): + return + channel = channel_id(delivery) + ts = thread_ts(delivery) + if not channel or not ts: + return + await slackbot_post( + "/api/slack/assistant/status", + {"channel_id": channel, "thread_ts": ts, "status": status}, + ) + + # ── Outbound channel post ────────────────────────────────────── + + async def send_channel_message( + self, + channel: str, + text: str, + *, + thread_id: str | None = None, + ) -> dict[str, Any]: + from api.app import get_tool_manager + + args: dict[str, Any] = { + "channel": channel, + "text": text, + "no_attribution": True, + } + if thread_id: + args["thread_ts"] = thread_id + raw = await get_tool_manager().call_tool("slack", "send_message", args) + try: + result = json.loads(raw) if isinstance(raw, str) else raw + except json.JSONDecodeError: + result = {"raw": raw} + if isinstance(result, dict) and result.get("error"): + raise RuntimeError(str(result["error"])) + return result + + # ── Live tool-call capture ───────────────────────────────────── + + def match_active_thread_capture( + self, + *, + thread_key: str, + tool_name: str, + method_name: str, + args: dict[str, Any], + ) -> ActiveThreadCapture | None: + if tool_name != "slack" or method_name != "send_message": + return None + parts = thread_key.split(":") + if len(parts) < 4 or parts[0] != "slack": + return None + active_channel = parts[2] + active_thread_ts = parts[3] + requested_channel = str( + args.get("channel") or args.get("channel_id") or "" + ).lstrip("#") + requested_thread_ts = str(args.get("thread_ts") or "") + channel_is_id = bool(_SLACK_CHANNEL_ID_RE.match(requested_channel)) + if channel_is_id and requested_channel != active_channel: + return None + if requested_thread_ts and requested_thread_ts != active_thread_ts: + return None + text = str(args.get("text") or args.get("message") or "").strip() + if not text: + return None + return ActiveThreadCapture( + text=text, + envelope={ + "captured": True, + "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", + "channel": active_channel, + "thread_ts": active_thread_ts, + }, + ) + + +# ── Singleton ───────────────────────────────────────────────────────── +# Registration is centralized via api.platforms.register_builtin_platforms; +# this module only exposes the singleton. + +SLACK_PLATFORM = SlackPlatform() diff --git a/services/api/api/routers/agent.py b/services/api/api/routers/agent.py index 0db0b1b5..f44f48ab 100644 --- a/services/api/api/routers/agent.py +++ b/services/api/api/routers/agent.py @@ -55,6 +55,39 @@ FINAL_DELIVERY_MAX_ATTEMPTS = int(os.getenv("FINAL_DELIVERY_MAX_ATTEMPTS", "50")) + +def _resolve_delivery_fallback( + body_delivery: dict[str, Any] | None, + body_platform: str | None, + body_user_id: str | None, + *, + endpoint: str, + default_platform: str, + default_channel: str, +) -> dict[str, Any]: + """Resolve the delivery dict for an execute/auto-execute call. + + Explicit ``body.delivery`` wins. Otherwise we build a minimal delivery + from ``body.platform`` (or the endpoint-specific default) and log a + deprecation note when the caller relied on the fallback so we can flush + the implicit defaults in a later release. + """ + if isinstance(body_delivery, dict) and body_delivery: + return body_delivery + platform = (body_platform or "").strip() or default_platform + if not body_platform: + log.info( + "execute_delivery_fallback", + endpoint=endpoint, + platform=platform, + reason="caller omitted both delivery and platform", + ) + return { + "channel": default_channel, + "platform": platform, + "recipient_user_id": body_user_id, + } + router = APIRouter( prefix="/agent", tags=["agent"], @@ -270,11 +303,14 @@ async def execute(request: Request): return _json_error(exc.code, exc.message, exc.status_code) execute_id = body.execute_id or f"exec-{uuid.uuid4().hex[:16]}" - delivery = body.delivery or { - "channel": "slack", - "platform": body.platform or "slack", - "recipient_user_id": body.user_id, - } + delivery = _resolve_delivery_fallback( + body.delivery, + body.platform, + body.user_id, + endpoint="execute", + default_platform="slack", + default_channel="slack", + ) metadata = body.metadata or {} if body.user_id: metadata = {**metadata, "user_id": body.user_id} @@ -339,11 +375,14 @@ async def _auto_execute(pool, body: ExecuteRequest) -> JSONResponse: # 3. Execute execute_id = body.execute_id or f"exec-{nonce}" - delivery = body.delivery or { - "channel": "dev", - "platform": body.platform or "dev", - "recipient_user_id": body.user_id, - } + delivery = _resolve_delivery_fallback( + body.delivery, + body.platform, + body.user_id, + endpoint="execute_auto", + default_platform="dev", + default_channel="dev", + ) result = await enqueue_execution( pool, diff --git a/services/api/api/runtime_control.py b/services/api/api/runtime_control.py index bb0306bc..10bfcd7d 100644 --- a/services/api/api/runtime_control.py +++ b/services/api/api/runtime_control.py @@ -23,7 +23,7 @@ steer_stdin, stop_session, ) -from api import slackbot_client +from api.platforms import resolve_for_delivery from api.observability import ( ExecutionObservationAccumulator, extract_usage_metrics, @@ -52,8 +52,12 @@ log = structlog.get_logger() +# Canonical key written by current slackbot deploys. _SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot_live_delivery" -_LEGACY_SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot" + "_v" + "2_live_delivery" +# Legacy spelling that pre-dated the rename to ``slackbot_live_delivery``. +# In-flight executions may still carry this value; we read both spellings +# but never write the legacy one. +_LEGACY_SLACKBOT_LIVE_DELIVERY_METADATA_KEY = "slackbot_v2_live_delivery" EXECUTION_SILENCE_TIMEOUT_S = int(os.getenv("EXECUTION_SILENCE_TIMEOUT_S", "600")) EXECUTION_TOOL_SILENCE_TIMEOUT_S = int( @@ -84,8 +88,6 @@ int(os.getenv("EXECUTION_RESERVED_USER_SLOTS", "16")), 0, ) -_MAX_SLACKBOT_TEXT_CHARS = 12_000 -_MAX_SLACKBOT_STEP_CHARS = 12_000 _MAX_WORKFLOW_EXECUTION_SLOTS = max( EXECUTION_WORKER_CONCURRENCY - EXECUTION_RESERVED_USER_SLOTS, 1, @@ -906,6 +908,35 @@ def execution_terminal(status: str) -> bool: return status in {"completed", "failed_permanent", "cancelled"} +async def get_live_session_id_for_thread(pool, thread_key: str) -> str | None: + """Return the live-delivery session_id bound to this thread's currently + running execution, or None when no execution has live delivery open. + + Reads the platform-agnostic metadata-key contract documented in + ``api.platforms`` (``slackbot_live_delivery`` / ``slackbot_agent_session_id``). + Platform adapters use this via ``tool_manager`` to forward an active + tool call into the live session without coupling themselves to the + ``agent_execution_requests`` schema. + """ + # Read both the canonical ``slackbot_live_delivery`` and the legacy + # ``slackbot_v2_live_delivery`` key so in-flight executions written + # before the rename are still routable. + session_id = await pool.fetchval( + "SELECT metadata->>'slackbot_agent_session_id' " + "FROM agent_execution_requests " + "WHERE thread_key = $1 " + "AND status = 'running' " + "AND (" + " metadata->>'slackbot_live_delivery' = 'true' " + " OR metadata->>'slackbot_v2_live_delivery' = 'true'" + ") " + "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " + "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", + thread_key, + ) + return str(session_id or "").strip() or None + + def build_execution_state_payload( *, execution_id: str, @@ -933,16 +964,6 @@ def build_execution_state_payload( return payload -def _clip_slackbot(value: Any, max_chars: int = _MAX_SLACKBOT_STEP_CHARS) -> str: - text = ( - value - if isinstance(value, str) - else json.dumps(value, ensure_ascii=False, default=str) - ) - text = text.strip() - return text if len(text) <= max_chars else f"{text[: max_chars - 1]}…" - - def _has_slackbot_live_delivery(metadata: dict[str, Any]) -> bool: return ( metadata.get(_SLACKBOT_LIVE_DELIVERY_METADATA_KEY) is True @@ -975,25 +996,6 @@ async def _mark_slackbot_live_delivery_failed( ) -def _canonical_text_blocks(event: dict[str, Any]) -> list[str]: - if event.get("type") == "assistant": - message = event.get("message") if isinstance(event.get("message"), dict) else {} - content = ( - message.get("content") if isinstance(message.get("content"), list) else [] - ) - return [ - str(block.get("text") or "") - for block in content - if isinstance(block, dict) - and block.get("type") == "text" - and str(block.get("text") or "").strip() - ] - if event.get("type") == "result": - text = str(event.get("result") or event.get("text") or "") - return [text] if text.strip() else [] - return [] - - def _slackbot_streamed_answer_chars(value: Any) -> int: if isinstance(value, bool): return 0 @@ -1002,72 +1004,6 @@ def _slackbot_streamed_answer_chars(value: Any) -> int: return 0 -async def _send_slackbot_canonical_event( - session_id: str, event: dict[str, Any] -) -> bool: - sent_text = False - for text in _canonical_text_blocks(event): - await slackbot_client.session_text( - session_id, - _clip_slackbot(text, _MAX_SLACKBOT_TEXT_CHARS), - ) - sent_text = True - - event_type = str(event.get("type") or "") - if event_type == "assistant": - message = event.get("message") if isinstance(event.get("message"), dict) else {} - content = ( - message.get("content") if isinstance(message.get("content"), list) else [] - ) - for block in content: - if not isinstance(block, dict) or block.get("type") != "tool_use": - continue - tool_id = str(block.get("id") or uuid.uuid4()) - tool_name = str(block.get("name") or "Tool") - tool_input = ( - block.get("input") if isinstance(block.get("input"), dict) else {} - ) - await slackbot_client.session_step( - session_id, - step_id=tool_id, - title=tool_name, - status="in_progress", - details=_clip_slackbot(tool_input), - ) - elif event_type == "tool": - content = event.get("content") if isinstance(event.get("content"), list) else [] - for block in content: - if not isinstance(block, dict): - continue - tool_id = str(block.get("tool_use_id") or uuid.uuid4()) - is_error = bool(block.get("is_error")) - await slackbot_client.session_step( - session_id, - step_id=tool_id, - title="Tool result", - status="error" if is_error else "complete", - output=_clip_slackbot(block.get("content")), - ) - elif event_type == "command_execution": - command = str(event.get("command") or "Command") - await slackbot_client.session_step( - session_id, - step_id=f"command-{hashlib.sha256(command.encode()).hexdigest()[:12]}", - title=command[:256], - status="complete" if event.get("status") == "completed" else "in_progress", - output=_clip_slackbot(event.get("aggregated_output") or ""), - ) - elif event_type == "error": - await slackbot_client.session_step( - session_id, - step_id=f"error-{uuid.uuid4().hex[:12]}", - title="Execution error", - status="error", - output=_clip_slackbot(event.get("error") or event), - ) - return sent_text - - async def append_execution_event( pool, *, @@ -2331,6 +2267,7 @@ async def _process_execution_impl(pool, row: dict[str, Any]) -> None: assignment_generation = int(row["assignment_generation"]) execution_status = str(row.get("status") or "running") delivery = decode_jsonb(row.get("delivery"), {}) + platform = resolve_for_delivery(delivery) execution_metadata = decode_jsonb(row.get("metadata"), {}) if not isinstance(execution_metadata, dict): execution_metadata = {} @@ -2594,7 +2531,7 @@ async def _finalize_execution( if finalize_session_id and not slackbot_done and slackbot_forward_live: try: terminal_result_sent_to_slackbot = False - await slackbot_client.session_done( + await platform.session_done( finalize_session_id, harness_thread_id or None ) slackbot_done = True @@ -2760,7 +2697,7 @@ async def _finalize_execution( slack_event.setdefault( "centaur_assignment_generation", assignment_generation ) - harness_result = await slackbot_client.harness_event( + harness_result = await platform.harness_event( slackbot_session_id, slack_event ) if harness_result is None: @@ -2789,7 +2726,7 @@ async def _finalize_execution( streamed_answer_chars=slackbot_streamed_answer_chars, ) with contextlib.suppress(Exception): - await slackbot_client.set_status(delivery, "") + await platform.assistant_status(delivery, "") slackbot_forward_live = False break if isinstance(harness_result, dict): diff --git a/services/api/api/slack_sanitize.py b/services/api/api/slack_sanitize.py index c37f0689..58c92956 100644 --- a/services/api/api/slack_sanitize.py +++ b/services/api/api/slack_sanitize.py @@ -1,76 +1,20 @@ -"""Strip known plumbing-leak shapes from assistant text bound for Slack.""" +"""Compatibility shim. The implementation lives in ``api.platforms.slack``; +this module remains so existing imports of ``sanitize_for_slack`` keep +working. New callers should import from ``api.platforms.slack`` +directly; this shim is scheduled for removal in a future cleanup PR. +""" from __future__ import annotations -import json -import re -from collections.abc import Callable -from typing import Any - -_THREAD_TRAILER_RE = re.compile( - r"(?:^|\s)(?:Agent|Codex|Amp|Claude\s+Code|Pi)\s+thread\s+`?[0-9a-f-]{8,}`?(?:,\s*with\s+interactive\s+elements)?(?=\s*$|[.!?]\s*$)", - re.IGNORECASE | re.MULTILINE, -) -_EXECUTION_TRAILER_RE = re.compile( - r"\b(?:Execution|execution_id)\s*[:=]\s*`?exe_[0-9a-f]{16}`?", - re.IGNORECASE, -) -_CURL_EXIT_RE = re.compile(r"curl:?\s*\((\d+)\):?\s*[^\n]{0,200}", re.IGNORECASE) - - -def _replace_matching_json_objects( - text: str, - predicate: Callable[[dict[str, Any]], bool], - replacement: str, -) -> str: - decoder = json.JSONDecoder() - out: list[str] = [] - index = 0 - while index < len(text): - if text[index] != "{": - out.append(text[index]) - index += 1 - continue - try: - value, end = decoder.raw_decode(text[index:]) - except ValueError: - out.append(text[index]) - index += 1 - continue - if isinstance(value, dict) and predicate(value): - out.append(replacement) - index += end - continue - out.append(text[index]) - index += 1 - return "".join(out) +import warnings +from api.platforms.slack import sanitize_for_slack # noqa: F401 -def _is_k8s_status(value: dict[str, Any]) -> bool: - return value.get("kind") == "Status" and any( - key in value for key in ("status", "reason", "code", "message") - ) - - -def _is_tool_error_envelope(value: dict[str, Any]) -> bool: - return "error_type" in value and any( - key in value for key in ("detail", "error", "status_code") - ) - +warnings.warn( + "api.slack_sanitize is a back-compat shim; " + "import sanitize_for_slack from api.platforms.slack instead.", + DeprecationWarning, + stacklevel=2, +) -def sanitize_for_slack(text: str | None, *, preserve_edges: bool = False) -> str: - """Strip known plumbing leaks from `text`. Idempotent; empty input -> "".""" - if not text: - return "" - sanitized = _replace_matching_json_objects( - text, _is_k8s_status, "[k8s status omitted]" - ) - sanitized = _replace_matching_json_objects( - sanitized, _is_tool_error_envelope, "[tool error omitted]" - ) - sanitized = _THREAD_TRAILER_RE.sub("", sanitized) - sanitized = _EXECUTION_TRAILER_RE.sub("[execution id omitted]", sanitized) - sanitized = _CURL_EXIT_RE.sub(r"transport_error(\1)", sanitized) - sanitized = re.sub(r"[ \t]+\n", "\n", sanitized) - sanitized = re.sub(r"\n{3,}", "\n\n", sanitized) - return sanitized if preserve_edges else sanitized.strip() +__all__ = ["sanitize_for_slack"] diff --git a/services/api/api/slackbot_client.py b/services/api/api/slackbot_client.py index dc4feca4..9851f646 100644 --- a/services/api/api/slackbot_client.py +++ b/services/api/api/slackbot_client.py @@ -1,122 +1,53 @@ -from __future__ import annotations - -import asyncio -import os -from typing import Any - -import httpx -import structlog - -from api.slack_sanitize import sanitize_for_slack - -log = structlog.get_logger() - -# Other 4xx is permanent: the slackbot is telling us the call is malformed. -_RETRYABLE_STATUS = frozenset({408, 429, 500, 502, 503, 504}) -_RETRY_ATTEMPTS = 3 -_RETRY_BASE_DELAY_S = 0.25 - - -def _base_url() -> str: - return os.getenv("SLACKBOT_URL", "").strip().rstrip("/") - - -def _api_key() -> str: - return os.getenv("SLACKBOT_API_KEY", "").strip() - - -def enabled() -> bool: - return bool(_base_url() and _api_key()) - - -async def post( - path: str, - body: dict[str, Any], - *, - timeout: httpx.Timeout | None = None, -) -> dict[str, Any] | None: - base_url = _base_url() - api_key = _api_key() - if not base_url or not api_key: - return None - request_timeout = timeout or httpx.Timeout(8.0, connect=2.0) - last_status: int | None = None - last_response: str | None = None - last_error: str | None = None - for attempt in range(_RETRY_ATTEMPTS): - try: - async with httpx.AsyncClient(timeout=request_timeout) as client: - response = await client.post( - f"{base_url}{path}", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - }, - json=body, - ) - text = response.text - if response.is_success: - if not text: - return {} - data = response.json() - return data if isinstance(data, dict) else {} - last_status = response.status_code - last_response = text[:500] - if response.status_code not in _RETRYABLE_STATUS: - log.warning( - "slackbot_call_failed", - path=path, - status=response.status_code, - response=last_response, - ) - return None - except Exception as exc: - last_error = str(exc) - if attempt + 1 < _RETRY_ATTEMPTS: - await asyncio.sleep(_RETRY_BASE_DELAY_S * (2**attempt)) - log.warning( - "slackbot_call_failed", - path=path, - status=last_status, - response=last_response, - error=last_error, - attempts=_RETRY_ATTEMPTS, - ) - return None +"""Compatibility shim. The implementation lives in ``api.platforms.slack``; +this module re-exports the module-level helpers and delegates the async +session helpers to the registered ``SlackPlatform`` singleton so existing +``from api import slackbot_client`` callers keep working. +New callers should resolve a platform via +``api.platforms.resolve_for_delivery(delivery)`` and call methods on it +directly. This shim is scheduled for removal in a future cleanup PR. +""" -def is_slack_delivery(delivery: dict[str, Any] | None) -> bool: - return isinstance(delivery, dict) and str(delivery.get("platform") or "") == "slack" - - -def channel_id(delivery: dict[str, Any]) -> str: - return str(delivery.get("channel") or delivery.get("channel_id") or "").strip() - - -def thread_ts(delivery: dict[str, Any]) -> str: - return str(delivery.get("thread_ts") or "").strip() - - -def recipient_team_id(delivery: dict[str, Any], thread_key: str) -> str: - value = str( - delivery.get("recipient_team_id") - or delivery.get("team_id") - or delivery.get("team") - or "" - ).strip() - if value: - return value - parts = thread_key.split(":") - return parts[1] if len(parts) >= 2 and parts[0] == "slack" else "" +from __future__ import annotations +import warnings +from typing import Any -def recipient_user_id(delivery: dict[str, Any], metadata: dict[str, Any]) -> str: - return str( - delivery.get("recipient_user_id") - or delivery.get("user_id") - or metadata.get("user_id") - or "" - ).strip() +from api.platforms.slack import ( + SLACK_PLATFORM, + channel_id, + enabled, + is_slack_delivery, + recipient_team_id, + recipient_user_id, + sanitize_slack_event, + slackbot_post as post, + thread_ts, +) + +warnings.warn( + "api.slackbot_client is a back-compat shim; resolve a platform via " + "api.platforms.resolve_for_delivery and call methods on it directly.", + DeprecationWarning, + stacklevel=2, +) + +__all__ = [ + "channel_id", + "enabled", + "harness_event", + "is_slack_delivery", + "open_agent_session", + "post", + "recipient_team_id", + "recipient_user_id", + "sanitize_slack_event", + "session_done", + "session_step", + "session_text", + "set_status", + "thread_ts", +] async def open_agent_session( @@ -127,32 +58,17 @@ async def open_agent_session( title: str = "Centaur execution", header: str | None = None, ) -> str | None: - if not enabled() or not is_slack_delivery(delivery): - return None - channel = channel_id(delivery) - parent_ts = thread_ts(delivery) - if not channel or not parent_ts: - return None - body: dict[str, Any] = { - "channel": channel, - "parent_ts": parent_ts, - "recipient_team_id": recipient_team_id(delivery, thread_key), - "recipient_user_id": recipient_user_id(delivery, metadata), - "title": title, - } - header_text = (header or "").strip() - if header_text: - body["header"] = header_text - result = await post("/api/slack/agent-sessions", body) - session_id = str((result or {}).get("session_id") or "").strip() - return session_id or None + return await SLACK_PLATFORM.open_live_session( + delivery=delivery, + metadata=metadata, + thread_key=thread_key, + title=title, + header=header, + ) async def session_text(session_id: str | None, markdown: str) -> None: - sanitized = sanitize_for_slack(markdown) - if not session_id or not sanitized.strip(): - return - await post(f"/api/slack/agent-sessions/{session_id}/text", {"markdown": sanitized}) + return await SLACK_PLATFORM.session_text(session_id, markdown) async def session_step( @@ -164,79 +80,27 @@ async def session_step( details: str | None = None, output: str | None = None, ) -> None: - if not session_id or not step_id or not title: - return - body: dict[str, Any] = { - "id": step_id, - "title": sanitize_for_slack(title), - "status": status, - } - if details: - body["details"] = sanitize_for_slack(details) - if output: - body["output"] = sanitize_for_slack(output) - await post(f"/api/slack/agent-sessions/{session_id}/step", body) - - -async def session_done(session_id: str | None, thread_id: str | None = None) -> None: - if not session_id: - return - body: dict[str, Any] = {} - if thread_id: - body["thread_id"] = thread_id - await post(f"/api/slack/agent-sessions/{session_id}/done", body) + return await SLACK_PLATFORM.session_step( + session_id, + step_id=step_id, + title=title, + status=status, + details=details, + output=output, + ) + + +async def session_done( + session_id: str | None, thread_id: str | None = None +) -> None: + return await SLACK_PLATFORM.session_done(session_id, thread_id) async def harness_event( session_id: str | None, event: dict[str, Any] ) -> dict[str, Any] | None: - if not session_id: - return None - return await post( - f"/api/slack/agent-sessions/{session_id}/harness-event", - {"event": sanitize_slack_event(event)}, - timeout=httpx.Timeout(60.0, connect=2.0), - ) + return await SLACK_PLATFORM.harness_event(session_id, event) async def set_status(delivery: dict[str, Any], status: str) -> None: - if not enabled() or not is_slack_delivery(delivery): - return - channel = channel_id(delivery) - ts = thread_ts(delivery) - if not channel or not ts: - return - await post( - "/api/slack/assistant/status", - {"channel_id": channel, "thread_ts": ts, "status": status}, - ) - - -_TEXT_KEYS = { - "content", - "delta", - "details", - "error", - "message", - "output", - "result", - "summary", - "text", - "title", -} - - -def sanitize_slack_event(value: Any) -> Any: - if isinstance(value, str): - return sanitize_for_slack(value, preserve_edges=True) - if isinstance(value, list): - return [sanitize_slack_event(item) for item in value] - if isinstance(value, dict): - sanitized: dict[str, Any] = {} - for key, item in value.items(): - if isinstance(item, (dict, list)) or key in _TEXT_KEYS: - sanitized[key] = sanitize_slack_event(item) - else: - sanitized[key] = item - return sanitized - return value + return await SLACK_PLATFORM.assistant_status(delivery, status) diff --git a/services/api/api/tool_manager.py b/services/api/api/tool_manager.py index 58e73f08..21749cf7 100644 --- a/services/api/api/tool_manager.py +++ b/services/api/api/tool_manager.py @@ -31,7 +31,7 @@ from api.laminar_tracing import set_span_attributes, start_span from api.vm_metrics import record_tool_call from api.deps import get_key_info, get_sandbox_claims, verify_api_key -from api import slackbot_client +from api.platforms import resolve_for_thread_key from centaur_sdk import ToolContext, reset_tool_context, set_tool_context log = structlog.get_logger() @@ -908,7 +908,7 @@ def _timeout_label(timeout_s: float | None) -> str: return "no timeout" if timeout_s is None else f"{timeout_s:g}s" -async def _capture_live_slack_send( +async def _capture_active_thread_tool_call( *, request: Request | None, sandbox_claims: dict[str, Any] | None, @@ -916,62 +916,50 @@ async def _capture_live_slack_send( method_name: str, args: dict[str, Any], ) -> dict[str, Any] | None: + """Give the active thread's messaging platform a chance to intercept this + tool call (e.g. an agent calling ``slack.send_message`` to the active + thread is re-routed through the live streaming session). + + The platform's ``match_active_thread_capture`` is a pure synchronous + function that decides whether to capture; this wrapper owns the + live-session lookup and the actual ``session_text`` forward so + platform adapters stay decoupled from FastAPI app state and the + ``agent_execution_requests`` schema. + + Returns the captured-result envelope or ``None`` to fall through to + the normal tool path. + """ if request is None or not sandbox_claims: return None - if tool_name != "slack" or method_name != "send_message": - return None - thread_key = str(sandbox_claims.get("thread_key") or "") - parts = thread_key.split(":") - if len(parts) < 4 or parts[0] != "slack": - return None - active_channel = parts[2] - active_thread_ts = parts[3] - requested_channel = str(args.get("channel") or args.get("channel_id") or "").lstrip("#") - requested_thread_ts = str(args.get("thread_ts") or "") - channel_is_id = bool(re.match(r"^[CDG][A-Z0-9]+$", requested_channel)) - if channel_is_id and requested_channel != active_channel: + platform = resolve_for_thread_key(thread_key) + if platform is None: return None - if requested_thread_ts and requested_thread_ts != active_thread_ts: - return None - - text = str(args.get("text") or args.get("message") or "").strip() - if not text: + match = platform.match_active_thread_capture( + thread_key=thread_key, + tool_name=tool_name, + method_name=method_name, + args=args, + ) + if match is None: return None - pool = getattr(request.app.state, "db_pool", None) if pool is None: return None - session_id = await pool.fetchval( - "SELECT metadata->>'slackbot_agent_session_id' " - "FROM agent_execution_requests " - "WHERE thread_key = $1 " - "AND status = 'running' " - "AND (" - " metadata->>'slackbot_live_delivery' = 'true' " - " OR metadata->>('slackbot' || '_v' || '2_live_delivery') = 'true'" - ") " - "AND COALESCE(metadata->>'slackbot_agent_session_id', '') <> '' " - "ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT 1", - thread_key, - ) - session_id = str(session_id or "").strip() + from api.runtime_control import get_live_session_id_for_thread + + session_id = await get_live_session_id_for_thread(pool, thread_key) if not session_id: return None - - await slackbot_client.session_text(session_id, text) + await platform.session_text(session_id, match.text) log.info( - "slack_send_message_captured", + "active_thread_tool_call_captured", thread_key=thread_key, + platform=platform.name, sandbox_container_id=sandbox_claims.get("container_id"), - slackbot_agent_session_id=session_id, + live_session_id=session_id, ) - return { - "captured": True, - "message": "Captured into the active Slackbot live reply; no separate Slack message was posted.", - "channel": active_channel, - "thread_ts": active_thread_ts, - } + return match.envelope async def _extract_tool_attachment( @@ -1855,24 +1843,24 @@ async def call_tool_raw( } t0 = time.monotonic() log.info("tool_call_started", **call_fields) - captured_slack_send = await _capture_live_slack_send( + captured_tool_call = await _capture_active_thread_tool_call( request=request, sandbox_claims=sandbox_claims, tool_name=tool_name, method_name=method_name, args=args, ) - if captured_slack_send is not None: + if captured_tool_call is not None: duration_ms = round((time.monotonic() - t0) * 1000) log.info( "tool_call_completed", duration_ms=duration_ms, success=True, - result_size_bytes=_payload_size_bytes(captured_slack_send), + result_size_bytes=_payload_size_bytes(captured_tool_call), captured=True, **call_fields, ) - return captured_slack_send + return captured_tool_call validation_error = _tool_arg_validation_error(method, args) if validation_error is not None: log.warning( @@ -2024,27 +2012,27 @@ async def call_tool( } t0 = time.monotonic() log.info("tool_call_started", **call_fields) - captured_slack_send = await _capture_live_slack_send( + captured_tool_call = await _capture_active_thread_tool_call( request=request, sandbox_claims=sandbox_claims, tool_name=tool_name, method_name=method_name, args=args, ) - if captured_slack_send is not None: + if captured_tool_call is not None: duration_ms = round((time.monotonic() - t0) * 1000) log.info( "tool_call_completed", duration_ms=duration_ms, success=True, - result_size_bytes=_payload_size_bytes(captured_slack_send), + result_size_bytes=_payload_size_bytes(captured_tool_call), captured=True, **call_fields, ) record_tool_call(tool_name, method_name, True, duration_ms / 1000) if format == "toon": - return _to_toon(captured_slack_send) - return _normalize_for_serialization(captured_slack_send) + return _to_toon(captured_tool_call) + return _normalize_for_serialization(captured_tool_call) validation_error = _tool_arg_validation_error(method, args) if validation_error is not None: log.warning( diff --git a/services/api/api/workflow_engine.py b/services/api/api/workflow_engine.py index dc04cc38..529adc0c 100644 --- a/services/api/api/workflow_engine.py +++ b/services/api/api/workflow_engine.py @@ -32,7 +32,7 @@ import structlog -from api import slackbot_client +from api.platforms import resolve_for_delivery from api.runtime_control import ( ControlPlaneError, append_message, @@ -75,22 +75,36 @@ class Delivery: team_id: str | None = None @classmethod - def slack( + def for_channel( cls, + platform: str, channel: str, - thread_ts: str, + thread_id: str | None = None, *, user_id: str | None = None, team_id: str | None = None, ) -> Delivery: return cls( - platform="slack", + platform=platform, channel=channel, - thread_ts=thread_ts, + thread_ts=thread_id, recipient_user_id=user_id, recipient_team_id=team_id, ) + @classmethod + def slack( + cls, + channel: str, + thread_ts: str, + *, + user_id: str | None = None, + team_id: str | None = None, + ) -> Delivery: + return cls.for_channel( + "slack", channel, thread_ts, user_id=user_id, team_id=team_id, + ) + @classmethod def dev(cls) -> Delivery: return cls(platform="dev") @@ -778,42 +792,45 @@ async def run_agent( eager_start=eager_start, ) - async def post_to_slack( + async def post_to_channel( self, + platform: str, channel: str, text: str, *, - thread_ts: str | None = None, + thread_id: str | None = None, ) -> dict[str, Any]: - """Post a message to a Slack channel via the slack tool. + """Post a message to a channel via the platform's outbound tool. - Accepts channel name (e.g. ``"team-updates"``) or ID. - Uses a checkpointed step so the message is sent exactly once, - even if the workflow replays. + Uses a checkpointed step so the message is sent exactly once even + if the workflow replays. ``platform`` selects the registered + messaging platform (``"slack"``, ``"discord"``, …). """ - from api.app import get_tool_manager + from api.platforms import resolve_platform + + platform_impl = resolve_platform(platform) async def _post() -> dict[str, Any]: - tm = get_tool_manager() - args: dict[str, Any] = { - "channel": channel, - "text": text, - "no_attribution": True, - } - if thread_ts: - args["thread_ts"] = thread_ts - raw = await tm.call_tool("slack", "send_message", args) - import json as _json - try: - result = _json.loads(raw) if isinstance(raw, str) else raw - except (ValueError, TypeError): - result = {"raw": raw} - if isinstance(result, dict) and result.get("error"): - raise RuntimeError(str(result["error"])) - return result + return await platform_impl.send_channel_message( + channel, text, thread_id=thread_id, + ) + + step_name = f"post_{platform_impl.name}_{channel}" + return await self.step( + step_name, _post, step_kind=f"{platform_impl.name}_post", + ) - step_name = f"post_slack_{channel}" - return await self.step(step_name, _post, step_kind="slack_post") + async def post_to_slack( + self, + channel: str, + text: str, + *, + thread_ts: str | None = None, + ) -> dict[str, Any]: + """Slack-specific convenience wrapper around ``post_to_channel``.""" + return await self.post_to_channel( + "slack", channel, text, thread_id=thread_ts, + ) @property def tools(self) -> _ToolProxy: @@ -1141,6 +1158,7 @@ async def _dispatch() -> dict[str, Any]: effective_delivery = dict(run_in.get("delivery") or {}) effective_history = history_messages or run_in.get("history_messages") or [] selector = {"persona_id": persona, "harness": harness} + platform = resolve_for_delivery(effective_delivery) slackbot_session_id: str | None = None try: @@ -1155,7 +1173,7 @@ async def _dispatch() -> dict[str, Any]: ) except Exception as exc: try: - failure_session_id = await slackbot_client.open_agent_session( + failure_session_id = await platform.open_live_session( delivery=effective_delivery, metadata=effective_metadata, thread_key=effective_thread_key, @@ -1163,11 +1181,11 @@ async def _dispatch() -> dict[str, Any]: header=None, ) if failure_session_id: - await slackbot_client.session_text( + await platform.session_text( failure_session_id, f"Failed to start the runtime: {exc}", ) - await slackbot_client.session_done(failure_session_id) + await platform.session_done(failure_session_id) except Exception: log.warning( "workflow_spawn_failure_session_failed", @@ -1184,7 +1202,7 @@ async def _dispatch() -> dict[str, Any]: session_header = await _compute_agent_session_header( ctx._pool, effective_thread_key, selector, ) - slackbot_session_id = await slackbot_client.open_agent_session( + slackbot_session_id = await platform.open_live_session( delivery=effective_delivery, metadata=effective_metadata, thread_key=effective_thread_key, @@ -1344,6 +1362,9 @@ class _RegisteredHandler: source_path: str version: str schedule: dict[str, Any] | None = None # Optional SCHEDULE export + # Input keys to drop from the request-hash so they don't affect + # idempotency (e.g. ``history_messages`` for messaging_thread_turn). + hash_ignored_input_keys: tuple[str, ...] = () # Maps workflow_name → registered handler + optional input class @@ -1432,7 +1453,7 @@ def _load_workflow_file( return wf_handler = getattr(mod, "handler", None) - # Auto-generate handler from PROMPT + SLACK_CHANNEL exports + # Auto-generate handler from PROMPT + (SLACK_CHANNEL | DELIVERY) exports if not callable(wf_handler): prompt_val = getattr(mod, "PROMPT", None) if not isinstance(prompt_val, str) or not prompt_val.strip(): @@ -1443,16 +1464,48 @@ def _load_workflow_file( ) return channel_val = getattr(mod, "SLACK_CHANNEL", None) + delivery_val = getattr(mod, "DELIVERY", None) + if not isinstance(delivery_val, dict): + delivery_val = None async def _auto_handler( inp: Any, ctx: WorkflowContext, - _prompt: str = prompt_val, _channel: str | None = channel_val, + _prompt: str = prompt_val, + _channel: str | None = channel_val, + _delivery: dict[str, Any] | None = delivery_val, ) -> dict[str, Any]: result = await ctx.agent_turn(_prompt) text = result.get("result_text", "") - channel = (inp.get("slack_channel") if isinstance(inp, dict) else None) or _channel + inp_delivery = ( + inp.get("delivery") if isinstance(inp, dict) else None + ) or {} + inp_slack_channel = ( + inp.get("slack_channel") if isinstance(inp, dict) else None + ) + platform_name = ( + inp_delivery.get("platform") + or (_delivery.get("platform") if _delivery else None) + or "slack" + ) + channel = ( + inp_delivery.get("channel") + or (_delivery.get("channel") if _delivery else None) + or inp_slack_channel + or _channel + ) + thread_id = ( + inp_delivery.get("thread_ts") + or inp_delivery.get("thread_id") + or ( + _delivery.get("thread_ts") or _delivery.get("thread_id") + if _delivery + else None + ) + ) if text and channel: - await ctx.post_to_slack(channel, text) + await ctx.post_to_channel( + platform_name, channel, text, thread_id=thread_id, + ) return result wf_handler = _auto_handler @@ -1473,15 +1526,48 @@ async def _auto_handler( slack_ch = getattr(mod, "SLACK_CHANNEL", None) if isinstance(slack_ch, str) and slack_ch.strip(): schedule.setdefault("slack_channel", slack_ch.strip()) + mod_delivery = getattr(mod, "DELIVERY", None) + if ( + isinstance(mod_delivery, dict) + and mod_delivery.get("platform") + and mod_delivery.get("channel") + ): + schedule.setdefault("delivery", dict(mod_delivery)) version = hashlib.sha256(py_file.read_bytes()).hexdigest() - _WORKFLOW_HANDLERS[wf_name] = _RegisteredHandler( + raw_hash_ignored = getattr(mod, "HASH_IGNORED_INPUT_KEYS", ()) or () + if isinstance(raw_hash_ignored, str): + raw_hash_ignored = (raw_hash_ignored,) + hash_ignored_input_keys = tuple( + str(key) for key in raw_hash_ignored if isinstance(key, str) and key + ) + registered = _RegisteredHandler( handler=wf_handler, input_cls=input_cls, source_path=str(py_file), version=version, schedule=schedule, + hash_ignored_input_keys=hash_ignored_input_keys, ) + _WORKFLOW_HANDLERS[wf_name] = registered discovered[wf_name] = str(py_file) + # Register any aliases this workflow module declares (for renames + # where the old name must keep resolving to the new handler). + aliases = getattr(mod, "WORKFLOW_ALIASES", ()) or () + if isinstance(aliases, str): + aliases = (aliases,) + for alias in aliases: + if not isinstance(alias, str) or not alias.strip() or alias == wf_name: + continue + existing = _WORKFLOW_HANDLERS.get(alias) + if existing is not None and existing is not registered: + log.warning( + "workflow_alias_collision", + alias=alias, + new_source=str(py_file), + existing_source=existing.source_path, + ) + _WORKFLOW_HANDLERS[alias] = registered + discovered[alias] = str(py_file) except Exception: log.warning("workflow_handler_load_failed", file=str(py_file), exc_info=True) @@ -1592,10 +1678,17 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: that write to DB instead of posting to Slack) """ specs: list[ScheduleSpec] = [] + # Workflow aliases register the same _RegisteredHandler under multiple + # names; iterate canonical handlers once so a scheduled workflow with + # aliases doesn't fire its schedule N times per tick. + seen_handlers: set[int] = set() for wf_name, reg in _WORKFLOW_HANDLERS.items(): sched = reg.schedule if not sched: continue + if id(reg) in seen_handlers: + continue + seen_handlers.add(id(reg)) cron_expr = sched.get("cron") interval_s = sched.get("interval_seconds") if not cron_expr and not interval_s: @@ -1637,14 +1730,22 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: if thread_key: input_json["thread_key"] = thread_key - # Auto-derive Slack delivery from thread_key + # Auto-derive delivery from thread_key. Default platform is slack + # to preserve back-compat; future platforms can use their own + # ``:...`` thread_key prefix and the resolver below picks + # the right shape. if "delivery" not in input_json: try: channel, thread_ts = _split_thread_key(thread_key) + # 3-part ``platform:channel:ts`` keys carry their own + # platform namespace; 2-part ``channel:ts`` legacy keys + # predate the namespace and are always slack. + parts = thread_key.split(":") + derived_platform = parts[0] if len(parts) >= 3 else "slack" input_json["delivery"] = { "channel": channel, "thread_ts": thread_ts, - "platform": "slack", + "platform": derived_platform, } except ControlPlaneError: log.warning( @@ -1653,7 +1754,17 @@ def _registered_schedule_specs() -> list[ScheduleSpec]: thread_key=thread_key, ) - # slack_channel: use channel name for delivery (no thread_ts) + # Schedule-declared explicit delivery (any platform). + sched_delivery = sched.get("delivery") + if ( + isinstance(sched_delivery, dict) + and sched_delivery.get("platform") + and sched_delivery.get("channel") + and "delivery" not in input_json + ): + input_json["delivery"] = dict(sched_delivery) + + # slack_channel: back-compat shorthand for slack channel delivery. if slack_channel and "delivery" not in input_json: input_json["delivery"] = { "channel": slack_channel, @@ -1752,8 +1863,13 @@ def _workflow_request_hash( workflow_name: str, run_input: dict[str, Any], ) -> str: hash_input = dict(run_input) - if workflow_name == "slack_thread_turn": - hash_input.pop("history_messages", None) + # Workflows may declare ``HASH_IGNORED_INPUT_KEYS`` to drop selected + # input fields from the request hash (e.g. backfill payloads that + # shouldn't affect idempotency). + registered = _WORKFLOW_HANDLERS.get(workflow_name) + if registered is not None: + for key in registered.hash_ignored_input_keys: + hash_input.pop(key, None) return request_hash( {"workflow_name": workflow_name, "input": hash_input}, ) diff --git a/services/api/api/workflows/slack_thread_turn.py b/services/api/api/workflows/slack_thread_turn.py index cddf4f41..a7341ec4 100644 --- a/services/api/api/workflows/slack_thread_turn.py +++ b/services/api/api/workflows/slack_thread_turn.py @@ -1,4 +1,12 @@ -"""Workflow: single agent turn in a Slack thread.""" +"""Workflow: single agent turn in a messaging-platform thread. + +This handler is platform-agnostic — the delivery dict it receives carries +``platform``/``channel``/``thread_ts`` for any registered messaging +platform (Slack today, others later). The canonical workflow name is +``messaging_thread_turn``; the legacy name ``slack_thread_turn`` is kept +as an alias so existing clients (the slackbot) keep working until they +migrate. +""" from __future__ import annotations @@ -10,7 +18,11 @@ from api.runtime_control import ControlPlaneError from api.workflow_engine import Delivery, WorkflowContext -WORKFLOW_NAME = "slack_thread_turn" +WORKFLOW_NAME = "messaging_thread_turn" +WORKFLOW_ALIASES = ("slack_thread_turn",) +# Drop history_messages from the request hash so re-fires of the same +# user turn with different backfill history still hit the idempotency key. +HASH_IGNORED_INPUT_KEYS = ("history_messages",) _EXECUTION_HARNESSES = frozenset({"amp", "claude-code", "codex", "pi-mono"}) _PROMPT_FLAG_ALIASES = { @@ -469,7 +481,7 @@ async def handler(inp: Input, ctx: WorkflowContext) -> dict[str, Any]: if not thread_key: raise ControlPlaneError( "INVALID_WORKFLOW_INPUT", - "slack_thread_turn requires thread_key", + "messaging_thread_turn requires thread_key", 422, ) diff --git a/services/api/tests/test_agent_control_plane.py b/services/api/tests/test_agent_control_plane.py index 1c8d74b3..317104d5 100644 --- a/services/api/tests/test_agent_control_plane.py +++ b/services/api/tests/test_agent_control_plane.py @@ -1363,6 +1363,8 @@ async def _live_delivery_ack_without_answer_text(*_args, **_kwargs): session_text_mock = AsyncMock() session_done_mock = AsyncMock() backend = SimpleNamespace(attach=AsyncMock(), close_streams=AsyncMock()) + from api.platforms.slack import SLACK_PLATFORM + with ( patch("api.runtime_control.get_or_spawn", new=AsyncMock(return_value=session)), patch( @@ -1377,16 +1379,19 @@ async def _live_delivery_ack_without_answer_text(*_args, **_kwargs): return_value=SimpleNamespace(turn_counter=1), ), patch("api.runtime_control._stream_stdout", _blank_placeholder_stream), - patch( - "api.runtime_control.slackbot_client.harness_event", + patch.object( + SLACK_PLATFORM, + "harness_event", new=AsyncMock(side_effect=_live_delivery_ack_without_answer_text), ), - patch( - "api.runtime_control.slackbot_client.session_text", + patch.object( + SLACK_PLATFORM, + "session_text", new=session_text_mock, ), - patch( - "api.runtime_control.slackbot_client.session_done", + patch.object( + SLACK_PLATFORM, + "session_done", new=session_done_mock, ), ): diff --git a/services/api/tests/test_integration.py b/services/api/tests/test_integration.py index 5fdb9283..ec0a67d3 100644 --- a/services/api/tests/test_integration.py +++ b/services/api/tests/test_integration.py @@ -562,18 +562,19 @@ def test_contains_timestamp(self): def test_requester_identity_with_github_handle(self): from api.agent import _build_session_context + from api.platforms import RequesterIdentity ctx = _build_session_context( "test:1", platform="slack", user_id="U123", - requester_identity={ - "slack_user_id": "U123", - "slack_mention": "<@U123>", - "github_handle": "@alice", - "github_handle_source": 'Slack profile custom field "GitHub"', - "github_handle_verified": True, - }, + requester_identity=RequesterIdentity( + user_id="U123", + mention="<@U123>", + github_handle="@alice", + github_handle_source='Slack profile custom field "GitHub"', + github_handle_verified=True, + ), ) assert "Requester Identity" in ctx @@ -582,19 +583,20 @@ def test_requester_identity_with_github_handle(self): def test_requester_identity_without_github_handle(self): from api.agent import _build_session_context + from api.platforms import RequesterIdentity ctx = _build_session_context( "test:1", platform="slack", user_id="U123", - requester_identity={ - "slack_user_id": "U123", - "slack_mention": "<@U123>", - "github_handle_verified": False, - "github_handle_unavailable_reason": ( + requester_identity=RequesterIdentity( + user_id="U123", + mention="<@U123>", + github_handle_verified=False, + github_handle_unavailable_reason=( "no GitHub custom field found on Slack profile" ), - }, + ), ) assert "GitHub handle from Slack profile: unavailable" in ctx @@ -652,16 +654,18 @@ async def test_insert_system_message_uses_thread_user_id_fallback( thread_key, ) + from api.platforms import RequesterIdentity + async def fake_resolve_requester_identity(*, platform, user_id): assert platform == "slack" assert user_id == "U123" - return { - "slack_user_id": user_id, - "slack_mention": f"<@{user_id}>", - "github_handle": "@alice", - "github_handle_source": 'Slack profile custom field "GitHub"', - "github_handle_verified": True, - } + return RequesterIdentity( + user_id=user_id, + mention=f"<@{user_id}>", + github_handle="@alice", + github_handle_source='Slack profile custom field "GitHub"', + github_handle_verified=True, + ) monkeypatch.setattr( agent, diff --git a/services/api/tests/test_platforms.py b/services/api/tests/test_platforms.py new file mode 100644 index 00000000..f5733325 --- /dev/null +++ b/services/api/tests/test_platforms.py @@ -0,0 +1,231 @@ +"""Tests for the messaging-platform abstraction in api.platforms.""" + +from __future__ import annotations + +import pytest + +from api.platforms import ( + MessagingPlatform, + PLATFORMS, + RequesterIdentity, + resolve_for_delivery, + resolve_for_thread_key, + resolve_platform, +) +from api.platforms.dev import DevPlatform +from api.platforms.slack import SLACK_PLATFORM, SlackPlatform + + +def test_registry_includes_slack_and_dev(): + assert isinstance(PLATFORMS["slack"], SlackPlatform) + assert isinstance(PLATFORMS["dev"], DevPlatform) + + +def test_resolve_platform_known_name(): + assert resolve_platform("slack") is SLACK_PLATFORM + + +def test_resolve_platform_unknown_falls_back_to_dev(): + fallback = resolve_platform("not-a-real-platform") + assert fallback.name == "dev" + + +def test_resolve_platform_none_falls_back_to_dev(): + fallback = resolve_platform(None) + assert fallback.name == "dev" + + +def test_resolve_for_delivery_reads_platform_key(): + assert resolve_for_delivery({"platform": "slack"}) is SLACK_PLATFORM + assert resolve_for_delivery({"platform": "dev"}).name == "dev" + assert resolve_for_delivery({}).name == "dev" + assert resolve_for_delivery(None).name == "dev" + + +def test_resolve_for_thread_key_matches_prefix(): + assert resolve_for_thread_key("slack:C123:1700.0001") is SLACK_PLATFORM + assert resolve_for_thread_key("dev:foo").name == "dev" + assert resolve_for_thread_key("nonexistent:foo") is None + assert resolve_for_thread_key("") is None + assert resolve_for_thread_key(None) is None + + +def test_slack_platform_owns_slack_delivery_identification(): + assert SLACK_PLATFORM.is_delivery_for_me({"platform": "slack"}) is True + assert SLACK_PLATFORM.is_delivery_for_me({"platform": "dev"}) is False + assert SLACK_PLATFORM.is_delivery_for_me({}) is False + assert SLACK_PLATFORM.is_delivery_for_me(None) is False + + +def test_slack_sanitize_strips_known_plumbing_leaks(): + out = SLACK_PLATFORM.sanitize_text( + 'Done. {"kind":"Status","status":"Failure","reason":"AlreadyExists"} ' + "Codex thread `019e3c91-4030-7910`" + ) + assert "k8s status omitted" in out + assert "Codex thread" not in out + + +def test_slack_thread_key_destination_pulls_channel(): + assert SLACK_PLATFORM.thread_key_destination("slack:C123:1700.0001") == "C123" + assert SLACK_PLATFORM.thread_key_destination("slack:T1:C123:1700.0001") == "C123" + assert SLACK_PLATFORM.thread_key_destination("dev:nothing") is None + + +def test_dev_platform_is_noop(): + dev = PLATFORMS["dev"] + assert dev.sanitize_text("hello {with} json") == "hello {with} json" + assert dev.system_prompt_rules(user_id="U1") == [] + assert dev.system_prompt_identity_lines({"x": "y"}) == [] + assert dev.thread_key_destination("slack:C1:1700.0001") is None + + +def test_slack_system_prompt_rules_mentions_requester_when_user_id_present(): + rules_with_user = SLACK_PLATFORM.system_prompt_rules(user_id="U999") + rules_without_user = SLACK_PLATFORM.system_prompt_rules() + assert any("U999" in line for line in rules_with_user) + assert not any("U999" in line for line in rules_without_user) + + +def test_slack_identity_lines_branch_on_verified(): + verified = SLACK_PLATFORM.system_prompt_identity_lines( + RequesterIdentity( + user_id="U1", + mention="<@U1>", + github_handle="@octocat", + github_handle_source="Slack profile custom field", + github_handle_verified=True, + ) + ) + assert any("@octocat" in line for line in verified) + assert any("verified: yes" in line for line in verified) + + unverified = SLACK_PLATFORM.system_prompt_identity_lines( + RequesterIdentity( + user_id="U2", + mention="<@U2>", + github_handle_verified=False, + github_handle_unavailable_reason="no GitHub field", + ) + ) + assert any("unavailable" in line for line in unverified) + assert any("verified: no" in line for line in unverified) + + +@pytest.mark.asyncio +async def test_base_send_channel_message_raises_for_unimplemented_platforms(): + base = MessagingPlatform() + base.name = "stub" + with pytest.raises(NotImplementedError): + await base.send_channel_message("c", "t") + + +def test_slack_match_captures_send_message_to_active_thread(): + match = SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "thread_ts": "1700.0001", "text": "hi"}, + ) + assert match is not None + assert match.text == "hi" + assert match.envelope["captured"] is True + assert match.envelope["channel"] == "C123" + + +def test_slack_match_skips_non_send_message_tool_calls(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="get_channel_history", + args={"channel": "C123"}, + ) + is None + ) + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="websearch", + method_name="search", + args={"query": "hi"}, + ) + is None + ) + + +def test_slack_match_skips_writes_to_other_channels(): + # Channel ID different from the active thread's channel. + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C999", "text": "hi"}, + ) + is None + ) + # Same channel but explicitly targeting a different thread. + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "thread_ts": "1700.9999", "text": "hi"}, + ) + is None + ) + + +def test_slack_match_skips_empty_text(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="slack:T1:C123:1700.0001", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": " "}, + ) + is None + ) + + +def test_slack_match_skips_non_slack_thread_key(): + assert ( + SLACK_PLATFORM.match_active_thread_capture( + thread_key="dev:nothing:here", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": "hi"}, + ) + is None + ) + + +def test_base_platform_match_returns_none(): + base = MessagingPlatform() + base.name = "stub" + assert ( + base.match_active_thread_capture( + thread_key="anything:1:2", + tool_name="slack", + method_name="send_message", + args={"channel": "C123", "text": "hi"}, + ) + is None + ) + + +def test_requester_identity_defaults_unverified_with_no_handle(): + identity = RequesterIdentity(user_id="U1", mention="<@U1>") + assert identity.github_handle is None + assert identity.github_handle_verified is False + assert identity.github_handle_unavailable_reason is None + + +def test_register_builtin_platforms_repopulates_cleared_registry(): + from api.platforms import register_builtin_platforms + + PLATFORMS.clear() + register_builtin_platforms() + assert "slack" in PLATFORMS + assert "dev" in PLATFORMS diff --git a/services/api/tests/test_slackbot_client.py b/services/api/tests/test_slackbot_client.py index aea3d175..9706ddd2 100644 --- a/services/api/tests/test_slackbot_client.py +++ b/services/api/tests/test_slackbot_client.py @@ -21,7 +21,7 @@ def _no_sleep(monkeypatch): async def _instant(_s: float) -> None: return None - monkeypatch.setattr("api.slackbot_client.asyncio.sleep", _instant) + monkeypatch.setattr("api.platforms.slack.asyncio.sleep", _instant) def _response(status: int, body: dict[str, Any] | None = None) -> httpx.Response: @@ -50,7 +50,7 @@ async def post(self, url: str, **kwargs: Any) -> httpx.Response: @pytest.mark.asyncio async def test_post_retries_on_5xx_then_returns_payload(): fake = _FakeClient([_response(502), _response(503), _response(200, {"ok": True})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post( @@ -65,7 +65,7 @@ async def test_post_retries_on_5xx_then_returns_payload(): @pytest.mark.parametrize("status", [408, 429]) async def test_post_retries_on_retryable_4xx(status: int): fake = _FakeClient([_response(status), _response(200, {"ok": True})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post("/api/slack/agent-sessions/sess/done", {}) @@ -78,7 +78,7 @@ async def test_post_retries_on_retryable_4xx(status: int): @pytest.mark.parametrize("status", [400, 403, 404]) async def test_post_does_not_retry_on_permanent_4xx(status: int): fake = _FakeClient([_response(status, {"error": "bad"})]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post("/api/slack/agent-sessions/sess/done", {}) @@ -90,7 +90,7 @@ async def test_post_does_not_retry_on_permanent_4xx(status: int): @pytest.mark.asyncio async def test_post_returns_none_after_exhausting_retries(): fake = _FakeClient([_response(502), _response(502), _response(502)]) - with patch("api.slackbot_client.httpx.AsyncClient", return_value=fake): + with patch("api.platforms.slack.httpx.AsyncClient", return_value=fake): from api import slackbot_client result = await slackbot_client.post( diff --git a/services/api/tests/test_slackbot_client_sanitize.py b/services/api/tests/test_slackbot_client_sanitize.py index ffb1afbd..28cf8189 100644 --- a/services/api/tests/test_slackbot_client_sanitize.py +++ b/services/api/tests/test_slackbot_client_sanitize.py @@ -1,10 +1,11 @@ -"""Tests for Slack egress sanitization in slackbot_client.""" +"""Tests for Slack egress sanitization.""" from __future__ import annotations import pytest from api import slackbot_client +from api.platforms import slack as _platform_slack @pytest.fixture @@ -15,7 +16,9 @@ async def fake_post(path: str, body: dict, **_kwargs): calls.append((path, body)) return {"ok": True} - monkeypatch.setattr(slackbot_client, "post", fake_post) + # SLACK_PLATFORM methods call platforms.slack.slackbot_post directly; + # patch the platform-side reference, not the shim alias. + monkeypatch.setattr(_platform_slack, "slackbot_post", fake_post) return calls diff --git a/services/api/tests/test_workflow_idempotency_unit.py b/services/api/tests/test_workflow_idempotency_unit.py index 3d3c1909..3218cde5 100644 --- a/services/api/tests/test_workflow_idempotency_unit.py +++ b/services/api/tests/test_workflow_idempotency_unit.py @@ -96,10 +96,12 @@ async def test_agent_turn_skips_existing_history_message_before_append(): "status": "queued", }) + from api.platforms.slack import SLACK_PLATFORM + with ( patch("api.workflow_engine._compute_agent_session_title", new=AsyncMock(return_value=None)), patch("api.workflow_engine._compute_agent_session_header", new=AsyncMock(return_value=None)), - patch("api.workflow_engine.slackbot_client.open_agent_session", new=AsyncMock(return_value=None)), + patch.object(SLACK_PLATFORM, "open_live_session", new=AsyncMock(return_value=None)), patch("api.workflow_engine.spawn_assignment", new=AsyncMock(return_value={"assignment_generation": 1})), patch("api.workflow_engine._message_request_exists", new=AsyncMock(return_value=True)) as exists, patch("api.workflow_engine.append_message", new=append_message), diff --git a/services/api/tests/test_workflows.py b/services/api/tests/test_workflows.py index 9538ef8b..5c715ce5 100644 --- a/services/api/tests/test_workflows.py +++ b/services/api/tests/test_workflows.py @@ -1392,13 +1392,16 @@ async def open_after_spawn(**kwargs): return_value={"ok": True, "execution_id": "exe-resolved", "status": "queued"}, ) + from api.platforms.slack import SLACK_PLATFORM + with ( patch( "api.workflow_engine.spawn_assignment", new=AsyncMock(side_effect=spawn_after_recording), ), - patch( - "api.workflow_engine.slackbot_client.open_agent_session", + patch.object( + SLACK_PLATFORM, + "open_live_session", new=AsyncMock(side_effect=open_after_spawn), ) as open_session_mock, patch("api.workflow_engine.append_message", new=append_message_mock), @@ -1450,21 +1453,26 @@ async def test_agent_turn_spawn_failure_opens_unresolved_failure_session(db_pool append_message_mock = AsyncMock() enqueue_execution_mock = AsyncMock() + from api.platforms.slack import SLACK_PLATFORM + with ( patch( "api.workflow_engine.spawn_assignment", new=AsyncMock(side_effect=RuntimeError("spawn unavailable")), ), - patch( - "api.workflow_engine.slackbot_client.open_agent_session", + patch.object( + SLACK_PLATFORM, + "open_live_session", new=AsyncMock(return_value="sess-failed"), ) as open_session_mock, - patch( - "api.workflow_engine.slackbot_client.session_text", + patch.object( + SLACK_PLATFORM, + "session_text", new=AsyncMock(), ) as session_text_mock, - patch( - "api.workflow_engine.slackbot_client.session_done", + patch.object( + SLACK_PLATFORM, + "session_done", new=AsyncMock(), ) as session_done_mock, patch("api.workflow_engine.append_message", new=append_message_mock),