diff --git a/docs/2026-03-30-channel-gateway-provider-status-updates.md b/docs/2026-03-30-channel-gateway-provider-status-updates.md new file mode 100644 index 0000000..1b2b141 --- /dev/null +++ b/docs/2026-03-30-channel-gateway-provider-status-updates.md @@ -0,0 +1,325 @@ +--- +date: 2026-03-30 +author: Onur Solmaz +title: Channel Gateway Provider Status Updates +tags: [spritz, channel-gateway, slack, discord, teams, architecture] +--- + +## Overview + +This document defines a shared channel gateway feature that lets the gateway +send one provider-native status message back into the originating conversation +while the target runtime is waking up, recovering, provisioning, or otherwise +delayed. + +Slack should ship this first because the shared Slack gateway already exists. +The feature itself must remain provider-agnostic so the same model can later +support Discord, Teams, and similar providers. + +These messages are gateway-authored. They are not emitted by the underlying +runtime. + +Related docs: + +- `docs/2026-03-23-shared-app-tenant-routing-architecture.md` +- `docs/2026-03-24-slack-channel-gateway-implementation-plan.md` + +## Context + +The shared channel gateway model already supports: + +1. provider event ingestion +2. route resolution to the target runtime +3. runtime execution +4. outbound provider replies through the same shared gateway + +That leaves a UX gap when the runtime is not immediately available. Common +examples: + +- the runtime was expired and is being recreated +- the runtime is still provisioning +- the gateway is retrying route or session resolution +- the first meaningful reply is delayed long enough that the conversation would + otherwise stay silent + +In those cases the gateway should be able to post one visible status message +before the real reply is ready. + +## Core Decision + +The shared channel gateway should be allowed to create one provider-visible +status message without waiting for a runtime-originated outbound action. + +The feature should follow these rules: + +- the gateway authors the status message with the shared provider app identity +- the status message targets the same conversation context as the source + message +- the fast path stays silent if recovery finishes before a short threshold +- at most one active status message exists per inbound message and purpose +- terminal success does not require cleanup, replacement, or a second + "I am back" message when a real reply is about to follow + +## Goals + +- provide immediate in-channel feedback when runtime recovery or delay becomes + user-visible +- keep provider credentials in the shared gateway instead of passing them into + runtimes +- define one provider-agnostic model that works for Slack first and can later + extend to Discord and Teams +- make status delivery idempotent across provider retries, gateway retries, and + gateway restarts + +## Non-Goals + +- streaming partial model output through the gateway +- exposing raw runtime phases directly to end users +- sending multiple progress updates for the same delayed request in v1 +- replacing the normal outbound reply path for final responses + +## UX Contract + +### When to show a status message + +The gateway should not send a visible status message immediately. + +Recommended default: + +- wait 3 to 5 seconds after inbound processing starts +- if the real reply or runtime recovery completes before that threshold, send + nothing extra +- if the threshold is crossed, ensure one visible status message exists + +Recommended initial trigger categories: + +- runtime recovery in progress +- runtime provisioning in progress +- retrying a failed route or session lookup +- first-reply latency crossed the visible-delay threshold + +### What the user should see + +The text should be short, neutral, and action-oriented. + +Examples: + +- "Still waking up. I will continue here shortly." +- "Still working on that. I will reply here shortly." +- "I could not recover the channel runtime. Please try again." + +The gateway should describe the effect on the user, not cluster terminology. + +### How success should look + +Phase 1 should stay simple: + +- once the gateway posts the status message, leave it in place +- when the runtime is ready, send the normal final reply once +- do not delete, edit, or replace the status message in v1 +- do not send a separate terminal "ready" message + +### How failure should look + +If recovery fails or times out, the gateway may leave the status message in +place and send one clear terminal error reply into the same conversation +context. + +## Provider Targeting Model + +The feature must use normalized conversation targeting so providers can map the +same behavior onto different APIs. + +At minimum the gateway should derive: + +- `conversationRef` +- `threadRef` when the provider has a thread or reply primitive +- `sourceMessageRef` +- `senderRef` + +Provider-specific examples: + +| Provider | Conversation target | Thread or reply target | Preferred status form | +| --- | --- | --- | --- | +| Slack | channel id | `thread_ts` when present, otherwise provider default for that conversation type | bot message in the same Slack conversation | +| Discord | channel id or thread id | source message reference when replying | reply or follow-up bot message | +| Teams | chat or channel conversation id | reply-to message id when supported | reply in conversation or fallback follow-up | + +The provider adapter decides the exact API call shape, but the user should +experience the same behavior across providers. + +## Status Message Model + +The gateway should track one durable status record per inbound message that +crosses the visible-delay threshold. + +Recommended fields: + +- `provider` +- `principalId` +- `externalScopeType` +- `externalTenantId` +- `conversationRef` +- `threadRef` +- `sourceMessageRef` +- `purpose` +- `state` +- `providerMessageRef` +- `expiresAt` +- timestamps + +Recommended purposes: + +- `runtime-recovery` +- `provisioning-delay` +- `first-reply-delay` +- `terminal-failure` + +Recommended states: + +- `pending` +- `visible` +- `completed` +- `failed` + +The storage implementation belongs to the shared gateway deployment or another +external integration store. Spritz core should not need to persist provider +message ids inside runtime objects. + +## Gateway Behavior + +### Inbound processing + +1. provider event reaches the gateway +2. the gateway acknowledges the provider webhook within the provider timeout +3. the gateway starts route resolution, runtime reconciliation, and normal + delivery +4. if the request crosses the visible-delay threshold, the gateway ensures one + status message exists for that source message + +### Recovery loop + +While the runtime is unavailable, the gateway may continue recovery work such +as: + +- session exchange retries +- runtime recreation polling +- installation reconciliation +- provider retry coordination + +Once the visible-delay threshold has produced one status message, the gateway +should keep using that same status record for deduplication and bookkeeping. +It should not post repeated progress messages for the same source message in +v1. + +### Finalization + +When the request reaches a terminal state: + +- on success, mark the status record completed and continue with the normal + reply path +- on failure, mark the status record failed and send one clear terminal error + reply if needed +- on duplicate inbound delivery, converge on the same status record instead of + creating a second one + +## Provider Adapter Contract + +The provider gateway layer should expose a narrow status-message interface +alongside the normal outbound reply interface. + +Example shape: + +```text +ensure_status_message(target, purpose, idempotency_key, body) -> provider_message_ref +``` + +Phase 1 requires only creation. Editing and deletion are optional future +capabilities, not part of the base contract. + +Provider capabilities still differ, so the adapter should declare whether it +supports: + +- replying in a thread or reply chain +- fallback follow-up messages when native reply targeting is unavailable + +The gateway should use the same provider authorization boundary as normal +outbound replies. The runtime should not receive raw provider tokens for this +feature. + +## Relationship To Runtime Outbound Actions + +This feature is separate from runtime-authored outbound actions. + +There are two outbound initiation paths: + +1. runtime-authored actions, such as normal replies, edits, and reactions +2. gateway-authored status messages, created by gateway control logic + +Both paths should reuse the same provider adapter primitives where possible, +but they must remain distinguishable in logging, metrics, and idempotency. + +## Idempotency And Reliability Rules + +- one inbound provider message may create at most one active status message for + a given purpose +- provider webhook retries must resolve to the same status record +- gateway restarts must be able to recover and continue from the same status + record +- finalizing an already-completed or failed status record must be safe +- the gateway must not emit a second final reply because a status message was + retried + +Recommended idempotency key input: + +- provider +- principal id +- external tenant identity +- source message identity +- status purpose + +## Implementation Sequence + +### Phase 1: Foundation (Critical Priority) + +- define the gateway-owned status record abstraction and storage contract +- add visible-delay threshold handling to the inbound processing path +- implement the generic provider adapter method for ensure +- implement Slack first because the shared Slack gateway already exists +- add metrics for creation, completion, failure, and time-to-first-status + +### Phase 2: Additional Providers (High Priority) + +- implement Discord targeting and status behavior +- implement Teams targeting and fallback behavior where reply capabilities are + weaker +- normalize provider capability flags so behavior stays consistent across + gateways + +### Phase 3: Enhancements (Medium Priority) + +- add richer status purposes such as queue delay or downstream retry delay +- add localization or provider-specific copy templates +- add operational controls for delay thresholds and timeout budgets + +## Validation + +Before calling the feature production-ready, validate: + +1. a fast request produces no visible status message +2. a delayed request produces exactly one status message in the correct + conversation target +3. duplicate provider deliveries do not create duplicate status messages +4. runtime recovery does not post additional status messages for the same + source message +5. successful completion leaves the status message in place and sends the final + reply once +6. terminal failure leaves the status message in place and sends a clear error + reply once +7. gateway restart during recovery still converges on the same provider message +8. the runtime never receives raw provider credentials for status delivery + +## References + +- `docs/2026-03-23-shared-app-tenant-routing-architecture.md` +- `docs/2026-03-24-slack-channel-gateway-implementation-plan.md` diff --git a/helm/spritz/templates/operator-deployment.yaml b/helm/spritz/templates/operator-deployment.yaml index 8ede96e..4f4ac32 100644 --- a/helm/spritz/templates/operator-deployment.yaml +++ b/helm/spritz/templates/operator-deployment.yaml @@ -53,6 +53,18 @@ spec: - name: SPRITZ_HOME_SIZE_LIMIT value: {{ .Values.operator.homeSizeLimit | quote }} {{- end }} + {{- if .Values.operator.lifecycleNotifications.url }} + - name: SPRITZ_LIFECYCLE_NOTIFY_URL + value: {{ .Values.operator.lifecycleNotifications.url | quote }} + {{- end }} + {{- if .Values.operator.lifecycleNotifications.authToken }} + - name: SPRITZ_LIFECYCLE_NOTIFY_AUTH_TOKEN + value: {{ .Values.operator.lifecycleNotifications.authToken | quote }} + {{- end }} + {{- if .Values.operator.lifecycleNotifications.timeout }} + - name: SPRITZ_LIFECYCLE_NOTIFY_TIMEOUT + value: {{ .Values.operator.lifecycleNotifications.timeout | quote }} + {{- end }} - name: SPRITZ_ROUTE_MODEL_TYPE value: {{ include "spritz.routeModel.type" . | quote }} - name: SPRITZ_ROUTE_HOST diff --git a/helm/spritz/values.yaml b/helm/spritz/values.yaml index e95ab8c..7cb0d2c 100644 --- a/helm/spritz/values.yaml +++ b/helm/spritz/values.yaml @@ -105,6 +105,10 @@ operator: workspaceSizeLimit: 10Gi homeSizeLimit: 5Gi podNodeSelector: "" + lifecycleNotifications: + url: "" + authToken: "" + timeout: 3s sharedMounts: enabled: false mounts: [] diff --git a/integrations/slack-gateway/.env.example b/integrations/slack-gateway/.env.example index 0c13209..5b0e6ee 100644 --- a/integrations/slack-gateway/.env.example +++ b/integrations/slack-gateway/.env.example @@ -26,3 +26,5 @@ SPRITZ_SLACK_PRINCIPAL_ID=slack-shared-app SPRITZ_SLACK_HTTP_TIMEOUT=15s SPRITZ_SLACK_DEDUPE_TTL=10m SPRITZ_SLACK_PROCESSING_TIMEOUT=60s +SPRITZ_SLACK_SESSION_RETRY_INTERVAL=1s +SPRITZ_SLACK_STATUS_MESSAGE_DELAY=3s diff --git a/integrations/slack-gateway/backend_client.go b/integrations/slack-gateway/backend_client.go index ab271db..1cd1ecb 100644 --- a/integrations/slack-gateway/backend_client.go +++ b/integrations/slack-gateway/backend_client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -41,6 +42,11 @@ type backendChannelSessionResponse struct { } `json:"session"` } +type backendChannelSessionUnavailableResponse struct { + Status string `json:"status"` + ProviderAuth slackInstallation `json:"providerAuth"` +} + type backendInstallationUpsertResponse struct { Status string `json:"status"` Installation struct { @@ -87,6 +93,50 @@ type channelSession struct { ProviderAuth slackInstallation } +type channelSessionUnavailableError struct { + providerAuth slackInstallation + cause *httpStatusError +} + +func (err *channelSessionUnavailableError) Error() string { + if err == nil { + return "channel session unavailable" + } + if err.cause == nil { + return "channel session unavailable" + } + return err.cause.Error() +} + +func (err *channelSessionUnavailableError) Unwrap() error { + if err == nil { + return nil + } + return err.cause +} + +func channelSessionUnavailableProviderAuth(err error) (slackInstallation, bool) { + var unavailableErr *channelSessionUnavailableError + if !errors.As(err, &unavailableErr) { + return slackInstallation{}, false + } + if strings.TrimSpace(unavailableErr.providerAuth.BotAccessToken) == "" { + return slackInstallation{}, false + } + return unavailableErr.providerAuth, true +} + +func isSpritzRuntimeMissingError(err error) bool { + var statusErr *httpStatusError + if !errors.As(err, &statusErr) { + return false + } + if statusErr.statusCode != http.StatusNotFound { + return false + } + return strings.Contains(strings.ToLower(statusErr.body), "spritz not found") +} + func (g *slackGateway) exchangeChannelSession(ctx context.Context, teamID string) (channelSession, error) { body := map[string]any{ "principalId": g.cfg.PrincipalID, @@ -96,6 +146,17 @@ func (g *slackGateway) exchangeChannelSession(ctx context.Context, teamID string } var payload backendChannelSessionResponse if err := g.postBackendJSON(ctx, "/internal/v1/spritz/channel-sessions/exchange", body, &payload); err != nil { + var statusErr *httpStatusError + if errors.As(err, &statusErr) && statusErr.statusCode == http.StatusServiceUnavailable { + var unavailablePayload backendChannelSessionUnavailableResponse + if json.Unmarshal([]byte(statusErr.body), &unavailablePayload) == nil && strings.TrimSpace(unavailablePayload.Status) == "unavailable" { + return channelSession{}, &channelSessionUnavailableError{ + providerAuth: unavailablePayload.ProviderAuth, + cause: statusErr, + } + } + return channelSession{}, &channelSessionUnavailableError{cause: statusErr} + } return channelSession{}, err } if payload.Status != "resolved" { diff --git a/integrations/slack-gateway/config.go b/integrations/slack-gateway/config.go index 079d18c..a922a4a 100644 --- a/integrations/slack-gateway/config.go +++ b/integrations/slack-gateway/config.go @@ -26,6 +26,8 @@ type config struct { HTTPTimeout time.Duration DedupeTTL time.Duration ProcessingTimeout time.Duration + SessionRetryInterval time.Duration + StatusMessageDelay time.Duration } func loadConfig() (config, error) { @@ -47,6 +49,8 @@ func loadConfig() (config, error) { HTTPTimeout: parseDurationEnv("SPRITZ_SLACK_HTTP_TIMEOUT", 15*time.Second), DedupeTTL: parseDurationEnv("SPRITZ_SLACK_DEDUPE_TTL", 10*time.Minute), ProcessingTimeout: parseDurationEnv("SPRITZ_SLACK_PROCESSING_TIMEOUT", 60*time.Second), + SessionRetryInterval: parseDurationEnv("SPRITZ_SLACK_SESSION_RETRY_INTERVAL", time.Second), + StatusMessageDelay: parseDurationEnv("SPRITZ_SLACK_STATUS_MESSAGE_DELAY", 3*time.Second), } if cfg.PublicURL == "" { diff --git a/integrations/slack-gateway/gateway.go b/integrations/slack-gateway/gateway.go index 56e7db4..509384b 100644 --- a/integrations/slack-gateway/gateway.go +++ b/integrations/slack-gateway/gateway.go @@ -40,6 +40,12 @@ func newSlackGateway(cfg config, logger *slog.Logger) *slackGateway { if cfg.ProcessingTimeout <= 0 { cfg.ProcessingTimeout = 60 * time.Second } + if cfg.SessionRetryInterval <= 0 { + cfg.SessionRetryInterval = time.Second + } + if cfg.StatusMessageDelay <= 0 { + cfg.StatusMessageDelay = 3 * time.Second + } return &slackGateway{ cfg: cfg, httpClient: &http.Client{Timeout: cfg.HTTPTimeout}, diff --git a/integrations/slack-gateway/gateway_test.go b/integrations/slack-gateway/gateway_test.go index fc95454..05cbf4e 100644 --- a/integrations/slack-gateway/gateway_test.go +++ b/integrations/slack-gateway/gateway_test.go @@ -1984,6 +1984,1512 @@ func TestProcessMessageEventPersistsReplyAliasAfterPromptTimeout(t *testing.T) { } } +func TestProcessMessageEventPostsStatusMessageWhileSessionRecoveryIsInFlight(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + call := sessionExchangeCalls.Add(1) + if call < 3 { + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello from concierge", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 200 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected recovery flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected wake-up message and final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != "Still waking up. I will continue here shortly." { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "Hello from concierge" { + t.Fatalf("expected final reply text, got %#v", got) + } + if _, ok := slackPayloads.items[0]["thread_ts"]; ok { + t.Fatalf("expected top-level wake-up message, got %#v", slackPayloads.items[0]["thread_ts"]) + } + if _, ok := slackPayloads.items[1]["thread_ts"]; ok { + t.Fatalf("expected top-level final reply, got %#v", slackPayloads.items[1]["thread_ts"]) + } + if sessionExchangeCalls.Load() != 3 { + t.Fatalf("expected 3 session exchange attempts, got %d", sessionExchangeCalls.Load()) + } +} + +func TestProcessMessageEventRecoversAfterRuntimeDisappearsMidFlight(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + call := sessionExchangeCalls.Add(1) + switch call { + case 1, 2: + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token-old", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-old", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + case 3: + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + default: + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token-new", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-new", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + } + })) + defer backend.Close() + + var upsertCalls atomic.Int32 + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + call := upsertCalls.Add(1) + if call <= 2 { + http.Error(w, `{"status":"error","message":"spritz not found"}`, http.StatusNotFound) + return + } + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello from recovered concierge", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 200 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected missing-runtime recovery flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected wake-up message and final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != "Still waking up. I will continue here shortly." { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "Hello from recovered concierge" { + t.Fatalf("expected final reply text, got %#v", got) + } + if sessionExchangeCalls.Load() != 4 { + t.Fatalf("expected 4 session exchange attempts, got %d", sessionExchangeCalls.Load()) + } + if upsertCalls.Load() != 4 { + t.Fatalf("expected two prompt retries plus alias persistence, got %d", upsertCalls.Load()) + } +} + +func TestProcessMessageEventRecoversAfterRuntimeReusesSameInstanceID(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + sessionExchangeCalls.Add(1) + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token-stable", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-stable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + var upsertCalls atomic.Int32 + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + call := upsertCalls.Add(1) + if call == 1 { + http.Error(w, `{"status":"error","message":"spritz not found"}`, http.StatusNotFound) + return + } + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello from stable concierge", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: time.Hour, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 200 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected stable-instance recovery flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 1 { + t.Fatalf("expected only the final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != "Hello from stable concierge" { + t.Fatalf("expected final reply text, got %#v", got) + } + if sessionExchangeCalls.Load() != 2 { + t.Fatalf("expected initial exchange plus one recovery exchange, got %d", sessionExchangeCalls.Load()) + } + if upsertCalls.Load() != 3 { + t.Fatalf("expected recovery retry plus alias persistence, got %d", upsertCalls.Load()) + } +} + +func TestProcessMessageEventPostsSingleWakeUpAcrossSessionAndRuntimeRecovery(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + call := sessionExchangeCalls.Add(1) + switch call { + case 1, 2, 4: + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + case 3: + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token-old", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-old", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + default: + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token-new", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-new", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + } + })) + defer backend.Close() + + var upsertCalls atomic.Int32 + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + call := upsertCalls.Add(1) + if call == 1 { + http.Error(w, `{"status":"error","message":"spritz not found"}`, http.StatusNotFound) + return + } + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello after both recoveries", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 250 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected combined recovery flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected one wake-up message and one final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != "Still waking up. I will continue here shortly." { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "Hello after both recoveries" { + t.Fatalf("expected final reply text, got %#v", got) + } + if sessionExchangeCalls.Load() != 5 { + t.Fatalf("expected 5 session exchange attempts, got %d", sessionExchangeCalls.Load()) + } + if upsertCalls.Load() != 3 { + t.Fatalf("expected recovery retry plus alias persistence, got %d", upsertCalls.Load()) + } +} + +func TestProcessMessageEventRetriesWakeUpAfterSlackPostFailure(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + var wakeUpAttempts atomic.Int32 + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + if payload["text"] == slackRecoveryStatusText && wakeUpAttempts.Add(1) == 1 { + http.Error(w, "slack unavailable", http.StatusBadGateway) + return + } + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + call := sessionExchangeCalls.Add(1) + if call <= 3 { + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello from concierge", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 250 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected wake-up retry flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if wakeUpAttempts.Load() != 2 { + t.Fatalf("expected two wake-up attempts, got %d", wakeUpAttempts.Load()) + } + if len(slackPayloads.items) != 3 { + t.Fatalf("expected failed wake-up, retried wake-up, and final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != slackRecoveryStatusText { + t.Fatalf("expected first payload to be the wake-up status, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != slackRecoveryStatusText { + t.Fatalf("expected second payload to retry the wake-up status, got %#v", got) + } + if got := slackPayloads.items[2]["text"]; got != "Hello from concierge" { + t.Fatalf("expected final reply text, got %#v", got) + } + if sessionExchangeCalls.Load() != 4 { + t.Fatalf("expected 4 session exchange attempts, got %d", sessionExchangeCalls.Load()) + } +} + +func TestProcessMessageEventPostsWakeUpDuringSlowPromptExecution(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + time.Sleep(20 * time.Millisecond) + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello from slow concierge", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 250 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected slow prompt flow to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected wake-up status and final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != slackRecoveryStatusText { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "Hello from slow concierge" { + t.Fatalf("expected final reply text, got %#v", got) + } +} + +func TestProcessMessageEventKeepsRecoveringAfterTransientExchangeError(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + call := sessionExchangeCalls.Add(1) + switch call { + case 1: + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + case 2: + http.Error(w, "backend unavailable", http.StatusInternalServerError) + default: + writeJSON(w, http.StatusOK, map[string]any{ + "status": "resolved", + "session": map[string]any{ + "accessToken": "owner-token", + "ownerAuthId": "owner-123", + "namespace": "spritz-staging", + "instanceId": "zeno-acme", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }, + }) + } + })) + defer backend.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + spritz := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/channel-conversations/upsert": + writeJSON(w, http.StatusCreated, map[string]any{ + "status": "success", + "data": map[string]any{ + "created": true, + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/bootstrap": + writeJSON(w, http.StatusOK, map[string]any{ + "status": "success", + "data": map[string]any{ + "effectiveSessionId": "session-1", + "conversation": map[string]any{ + "metadata": map[string]any{"name": "conv-1"}, + "spec": map[string]any{"sessionId": "session-1", "cwd": "/home/dev"}, + }, + }, + }) + case "/api/acp/conversations/conv-1/connect": + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("upgrade failed: %v", err) + } + defer conn.Close() + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message map[string]any + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("decode ws payload: %v", err) + } + switch message["method"] { + case "initialize": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{"protocolVersion": 1}}) + case "session/load": + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + case "session/prompt": + _ = conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": []map[string]any{{ + "type": "text", + "text": "Hello after transient exchange failure", + }}, + }, + }, + }) + _ = conn.WriteJSON(map[string]any{"jsonrpc": "2.0", "id": message["id"], "result": map[string]any{}}) + return + default: + t.Fatalf("unexpected ACP method %#v", message["method"]) + } + } + default: + t.Fatalf("unexpected spritz path %s", r.URL.Path) + } + })) + defer spritz.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: spritz.URL, + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 250 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected recovery after transient exchange error to succeed, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected wake-up status and final reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != slackRecoveryStatusText { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "Hello after transient exchange failure" { + t.Fatalf("expected final reply text, got %#v", got) + } + if sessionExchangeCalls.Load() != 3 { + t.Fatalf("expected recovery polling to continue through transient errors, got %d exchange attempts", sessionExchangeCalls.Load()) + } +} + +func TestProcessMessageEventIgnoresMentionOnlyBeforeRecoveryStarts(t *testing.T) { + var slackPostCalls atomic.Int32 + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + slackPostCalls.Add(1) + t.Fatalf("did not expect slack post for mention-only event") + })) + defer slackAPI.Close() + + var sessionExchangeCalls atomic.Int32 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sessionExchangeCalls.Add(1) + t.Fatalf("did not expect session exchange for mention-only event") + })) + defer backend.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: "https://spritz.example.test", + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 200 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot>", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected mention-only app mention to be ignored cleanly, got %v", err) + } + if sessionExchangeCalls.Load() != 0 { + t.Fatalf("expected no session exchange attempts, got %d", sessionExchangeCalls.Load()) + } + if slackPostCalls.Load() != 0 { + t.Fatalf("expected no slack posts, got %d", slackPostCalls.Load()) + } +} + +func TestProcessMessageEventPostsTerminalErrorAfterRecoveryTimeout(t *testing.T) { + var slackPayloads struct { + sync.Mutex + items []map[string]any + } + slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/chat.postMessage" { + t.Fatalf("unexpected slack path %s", r.URL.Path) + } + var payload map[string]any + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + t.Fatalf("decode slack post body: %v", err) + } + slackPayloads.Lock() + slackPayloads.items = append(slackPayloads.items, payload) + slackPayloads.Unlock() + writeJSON(w, http.StatusOK, map[string]any{"ok": true, "ts": fmt.Sprintf("1711387375.00010%d", len(slackPayloads.items))}) + })) + defer slackAPI.Close() + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/internal/v1/spritz/channel-sessions/exchange" { + t.Fatalf("unexpected backend path %s", r.URL.Path) + } + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "status": "unavailable", + "providerAuth": map[string]any{ + "providerInstallRef": "cred_slack_workspace_1", + "apiAppId": "A_app_1", + "teamId": "T_workspace_1", + "botUserId": "U_bot", + "botAccessToken": "xoxb-installed", + }, + }) + })) + defer backend.Close() + + cfg := config{ + SlackAPIBaseURL: slackAPI.URL, + BackendBaseURL: backend.URL, + BackendInternalToken: "backend-internal-token", + SpritzBaseURL: "https://spritz.example.test", + SpritzServiceToken: "spritz-service-token", + PrincipalID: "shared-slack-gateway", + HTTPTimeout: 200 * time.Millisecond, + DedupeTTL: time.Minute, + StatusMessageDelay: 5 * time.Millisecond, + SessionRetryInterval: 10 * time.Millisecond, + ProcessingTimeout: 200 * time.Millisecond, + } + gateway := newSlackGateway(cfg, slog.New(slog.NewTextHandler(io.Discard, nil))) + + envelope := slackEnvelope{ + APIAppID: "A_app_1", + TeamID: "T_workspace_1", + Event: slackEventInner{ + Type: "app_mention", + User: "U_user", + Text: "<@U_bot> hello", + Channel: "C_channel_1", + ChannelType: "channel", + TS: "1711387375.000100", + }, + } + delivery, process, err := gateway.beginMessageEventDelivery(envelope) + if err != nil { + t.Fatalf("beginMessageEventDelivery returned error: %v", err) + } + if !process { + t.Fatal("expected app mention to be processed") + } + + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond) + defer cancel() + if err := gateway.processMessageEventWithDelivery(ctx, envelope, delivery); err != nil { + t.Fatalf("expected recovery timeout to be handled with terminal Slack reply, got %v", err) + } + + slackPayloads.Lock() + defer slackPayloads.Unlock() + if len(slackPayloads.items) != 2 { + t.Fatalf("expected wake-up message and terminal error reply, got %#v", slackPayloads.items) + } + if got := slackPayloads.items[0]["text"]; got != "Still waking up. I will continue here shortly." { + t.Fatalf("expected wake-up status text, got %#v", got) + } + if got := slackPayloads.items[1]["text"]; got != "I could not recover the channel runtime. Please try again." { + t.Fatalf("expected terminal recovery error text, got %#v", got) + } +} + func TestProcessMessageEventSuppressesRetryAfterSlackReplyFailure(t *testing.T) { var postCalls int slackAPI := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/integrations/slack-gateway/slack_events.go b/integrations/slack-gateway/slack_events.go index 918403e..80d5dcc 100644 --- a/integrations/slack-gateway/slack_events.go +++ b/integrations/slack-gateway/slack_events.go @@ -10,11 +10,20 @@ import ( "fmt" "io" "net/http" + "regexp" "strconv" "strings" + "sync" "time" ) +const ( + slackRecoveryStatusText = "Still waking up. I will continue here shortly." + slackRecoveryFailureText = "I could not recover the channel runtime. Please try again." +) + +var slackMentionTokenPattern = regexp.MustCompile(`<@[^>]+>`) + type slackEnvelope struct { Type string `json:"type"` Challenge string `json:"challenge,omitempty"` @@ -36,6 +45,165 @@ type slackEventInner struct { ThreadTS string `json:"thread_ts,omitempty"` } +type channelSessionRecoveryState struct { + mu sync.Mutex + startedAt time.Time + statusAuth slackInstallation + statusAuthReady bool + statusPosting bool + statusVisible bool +} + +func newChannelSessionRecoveryState() *channelSessionRecoveryState { + return &channelSessionRecoveryState{startedAt: time.Now()} +} + +func (state *channelSessionRecoveryState) rememberProviderAuth(providerAuth slackInstallation) { + if state == nil || strings.TrimSpace(providerAuth.BotAccessToken) == "" { + return + } + state.mu.Lock() + defer state.mu.Unlock() + state.statusAuth = providerAuth + state.statusAuthReady = true +} + +// remainingStatusDelay reports how much longer the gateway should wait before +// posting the provider-authored wake-up status message. +func (state *channelSessionRecoveryState) remainingStatusDelay(delay time.Duration) time.Duration { + if state == nil { + return delay + } + state.mu.Lock() + defer state.mu.Unlock() + remaining := delay - time.Since(state.startedAt) + if remaining < 0 { + return 0 + } + return remaining +} + +func (state *channelSessionRecoveryState) hasProviderAuth() bool { + if state == nil { + return false + } + state.mu.Lock() + defer state.mu.Unlock() + return state.statusAuthReady +} + +func (state *channelSessionRecoveryState) maybePostStatus( + ctx context.Context, + g *slackGateway, + event slackEventInner, +) error { + if state == nil { + return nil + } + state.mu.Lock() + if !state.statusAuthReady || state.statusVisible || state.statusPosting { + state.mu.Unlock() + return nil + } + if time.Since(state.startedAt) < g.cfg.StatusMessageDelay { + state.mu.Unlock() + return nil + } + token := state.statusAuth.BotAccessToken + state.statusPosting = true + state.mu.Unlock() + + if err := g.postGatewaySlackMessage( + ctx, + token, + event, + slackRecoveryStatusText, + ); err != nil { + state.mu.Lock() + state.statusPosting = false + state.mu.Unlock() + return err + } + + state.mu.Lock() + state.statusPosting = false + state.statusVisible = true + state.mu.Unlock() + return nil +} + +func (state *channelSessionRecoveryState) maybePostFailure( + ctx context.Context, + g *slackGateway, + event slackEventInner, +) (bool, error) { + if state == nil { + return false, nil + } + state.mu.Lock() + if !state.statusAuthReady { + state.mu.Unlock() + return false, nil + } + if !state.statusVisible && time.Since(state.startedAt) < g.cfg.StatusMessageDelay { + state.mu.Unlock() + return false, nil + } + token := state.statusAuth.BotAccessToken + state.mu.Unlock() + if err := g.postGatewaySlackMessage( + ctx, + token, + event, + slackRecoveryFailureText, + ); err != nil { + return false, err + } + return true, nil +} + +// startPromptStatusTimer posts the provider-authored wake-up status once the +// visible-delay threshold is crossed while a prompt is still executing. +func (g *slackGateway) startPromptStatusTimer( + ctx context.Context, + event slackEventInner, + recoveryState *channelSessionRecoveryState, +) func() { + done := make(chan struct{}) + go func() { + remaining := recoveryState.remainingStatusDelay(g.cfg.StatusMessageDelay) + if remaining > 0 { + timer := time.NewTimer(remaining) + defer timer.Stop() + select { + case <-ctx.Done(): + return + case <-done: + return + case <-timer.C: + } + } + select { + case <-ctx.Done(): + return + case <-done: + return + default: + } + if err := recoveryState.maybePostStatus(ctx, g, event); err != nil { + g.logger.Error( + "slack recovery status post failed", + "error", err, + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + } + }() + return func() { + close(done) + } +} + func (g *slackGateway) handleSlackEvents(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) if err != nil { @@ -197,11 +365,20 @@ func (g *slackGateway) processMessageEventWithDelivery( }() event := envelope.Event + if normalizeSlackPromptText(event.Type, event.Text, "") == "" { + success = true + return nil + } - session, err := g.exchangeChannelSession(ctx, envelope.TeamID) + recoveryState := newChannelSessionRecoveryState() + session, terminalHandled, err := g.awaitChannelSession(ctx, envelope, event, recoveryState) if err != nil { return err } + if terminalHandled { + success = true + return nil + } if session.ProviderAuth.APIAppID != "" && strings.TrimSpace(envelope.APIAppID) != "" && session.ProviderAuth.APIAppID != strings.TrimSpace(envelope.APIAppID) { return fmt.Errorf("slack api_app_id mismatch for team %s", envelope.TeamID) } @@ -212,34 +389,84 @@ func (g *slackGateway) processMessageEventWithDelivery( session.ProviderAuth.BotUserID, ) if promptText == "" { + success = true return nil } - externalConversationID := slackExternalConversationID(event) - conversationID, err := g.upsertChannelConversation(ctx, session, event, envelope.TeamID, "", externalConversationID) - if err != nil { - return err - } - sessionID, cwd, err := g.bootstrapConversation(ctx, g.cfg.SpritzServiceToken, session.Namespace, conversationID) - if err != nil { - return err + stopPromptStatusTimer := g.startPromptStatusTimer(ctx, event, recoveryState) + result, err := g.executeConversationPrompt(ctx, envelope, event, session, promptText) + stopPromptStatusTimer() + for err != nil && !result.promptSent && isSpritzRuntimeMissingError(err) { + recoveryState.rememberProviderAuth(session.ProviderAuth) + if postErr := recoveryState.maybePostStatus(ctx, g, event); postErr != nil { + g.logger.Error( + "slack recovery status post failed", + "error", postErr, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + } + if sleepErr := sleepWithContext(ctx, g.cfg.SessionRetryInterval); sleepErr != nil { + terminalHandled, postErr := recoveryState.maybePostFailure(ctx, g, event) + if postErr != nil { + g.logger.Error( + "slack recovery failure reply failed", + "error", postErr, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + return postErr + } + if terminalHandled { + success = true + return nil + } + return sleepErr + } + recoveredSession, recoveredTerminalHandled, recoveryErr := g.awaitChannelSession( + ctx, + envelope, + event, + recoveryState, + ) + if recoveryErr != nil { + return recoveryErr + } + if recoveredTerminalHandled { + success = true + return nil + } + session = recoveredSession + promptText = buildSlackPromptText( + envelope.TeamID, + event, + session.ProviderAuth.BotUserID, + ) + if promptText == "" { + success = true + return nil + } + stopPromptStatusTimer = g.startPromptStatusTimer(ctx, event, recoveryState) + result, err = g.executeConversationPrompt(ctx, envelope, event, session, promptText) + stopPromptStatusTimer() } - reply, promptSent, err := g.promptConversation(ctx, g.cfg.SpritzServiceToken, session.Namespace, conversationID, sessionID, cwd, promptText) if err != nil { - if !promptSent { + if !result.promptSent { return err } - reply = "I hit an internal error while processing that request." - g.logger.Error("acp prompt failed", "error", err, "conversation_id", conversationID) + result.reply = "I hit an internal error while processing that request." + g.logger.Error("acp prompt failed", "error", err, "conversation_id", result.conversationID) } replyThreadTS := slackReplyThreadTS(event) replyCtx, cancelReply := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) defer cancelReply() - replyMessageTS, err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, reply, replyThreadTS) + replyMessageTS, err := g.postSlackMessage(replyCtx, session.ProviderAuth.BotAccessToken, event.Channel, result.reply, replyThreadTS) if err != nil { // Once the ACP prompt has already been delivered, suppress duplicate // Slack retries from re-running the same agent side effects. - success = promptSent + success = result.promptSent return err } if replyThreadTS == "" && !isSlackDirectMessageEvent(event) && strings.TrimSpace(replyMessageTS) != "" { @@ -249,14 +476,14 @@ func (g *slackGateway) processMessageEventWithDelivery( session, event, envelope.TeamID, - conversationID, + result.conversationID, replyMessageTS, ); err != nil { cancelAlias() g.logger.Error( "slack reply alias persistence failed", "error", err, - "conversation_id", conversationID, + "conversation_id", result.conversationID, "reply_message_ts", replyMessageTS, ) } else { @@ -267,6 +494,130 @@ func (g *slackGateway) processMessageEventWithDelivery( return nil } +type conversationPromptResult struct { + conversationID string + reply string + promptSent bool +} + +func (g *slackGateway) executeConversationPrompt( + ctx context.Context, + envelope slackEnvelope, + event slackEventInner, + session channelSession, + promptText string, +) (conversationPromptResult, error) { + externalConversationID := slackExternalConversationID(event) + conversationID, err := g.upsertChannelConversation( + ctx, + session, + event, + envelope.TeamID, + "", + externalConversationID, + ) + if err != nil { + return conversationPromptResult{}, err + } + sessionID, cwd, err := g.bootstrapConversation( + ctx, + g.cfg.SpritzServiceToken, + session.Namespace, + conversationID, + ) + if err != nil { + return conversationPromptResult{conversationID: conversationID}, err + } + reply, promptSent, err := g.promptConversation( + ctx, + g.cfg.SpritzServiceToken, + session.Namespace, + conversationID, + sessionID, + cwd, + promptText, + ) + return conversationPromptResult{ + conversationID: conversationID, + reply: reply, + promptSent: promptSent, + }, err +} + +func (g *slackGateway) awaitChannelSession( + ctx context.Context, + envelope slackEnvelope, + event slackEventInner, + recoveryState *channelSessionRecoveryState, +) (channelSession, bool, error) { + if recoveryState == nil { + recoveryState = newChannelSessionRecoveryState() + } + + for { + session, err := g.exchangeChannelSession(ctx, envelope.TeamID) + if err == nil { + recoveryState.rememberProviderAuth(session.ProviderAuth) + return session, false, nil + } + + providerAuth, recoverable := channelSessionUnavailableProviderAuth(err) + if !recoverable { + if recoveryState.hasProviderAuth() { + g.logger.Error( + "slack session recovery poll failed", + "error", err, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + } else { + if terminalHandled, postErr := recoveryState.maybePostFailure(ctx, g, event); postErr != nil { + g.logger.Error( + "slack recovery failure reply failed", + "error", postErr, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + return channelSession{}, false, postErr + } else if terminalHandled { + return channelSession{}, true, nil + } + return channelSession{}, false, err + } + } else { + recoveryState.rememberProviderAuth(providerAuth) + } + + if postErr := recoveryState.maybePostStatus(ctx, g, event); postErr != nil { + g.logger.Error( + "slack recovery status post failed", + "error", postErr, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + } + + if err := sleepWithContext(ctx, g.cfg.SessionRetryInterval); err != nil { + if terminalHandled, postErr := recoveryState.maybePostFailure(ctx, g, event); postErr != nil { + g.logger.Error( + "slack recovery failure reply failed", + "error", postErr, + "team_id", strings.TrimSpace(envelope.TeamID), + "channel_id", strings.TrimSpace(event.Channel), + "message_ts", strings.TrimSpace(event.TS), + ) + return channelSession{}, false, postErr + } else if terminalHandled { + return channelSession{}, true, nil + } + return channelSession{}, false, err + } + } +} + func shouldIgnoreSlackMessageEvent(event slackEventInner) bool { subtype := strings.TrimSpace(event.Subtype) return subtype != "" && subtype != "file_share" @@ -299,9 +650,11 @@ func normalizeSlackPromptText(eventType, text, botUserID string) string { normalized[:index] + normalized[index+len(mentionToken):], ) } + } else { + normalized = slackMentionTokenPattern.ReplaceAllString(normalized, " ") } } - return normalized + return strings.TrimSpace(normalized) } type slackPromptContext struct { @@ -360,6 +713,35 @@ func slackExternalConversationID(event slackEventInner) string { return strings.TrimSpace(event.TS) } +func (g *slackGateway) postGatewaySlackMessage( + ctx context.Context, + token string, + event slackEventInner, + text string, +) error { + replyCtx, cancelReply := context.WithTimeout(context.WithoutCancel(ctx), g.cfg.HTTPTimeout) + defer cancelReply() + _, err := g.postSlackMessage( + replyCtx, + token, + event.Channel, + text, + slackReplyThreadTS(event), + ) + return err +} + +func sleepWithContext(ctx context.Context, duration time.Duration) error { + timer := time.NewTimer(duration) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (g *slackGateway) verifySlackSignature(header http.Header, body []byte) error { timestampRaw := strings.TrimSpace(header.Get("X-Slack-Request-Timestamp")) signature := strings.TrimSpace(header.Get("X-Slack-Signature")) diff --git a/operator/controllers/lifecycle_notifications.go b/operator/controllers/lifecycle_notifications.go new file mode 100644 index 0000000..676276b --- /dev/null +++ b/operator/controllers/lifecycle_notifications.go @@ -0,0 +1,103 @@ +package controllers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" +) + +const defaultLifecycleNotificationTimeout = 3 * time.Second + +// LifecycleNotificationConfig controls the optional runtime lifecycle webhook. +type LifecycleNotificationConfig struct { + URL string + AuthToken string + Timeout time.Duration + Client *http.Client +} + +type lifecycleNotificationPayload struct { + Namespace string `json:"namespace"` + InstanceID string `json:"instanceId"` + Phase string `json:"phase"` +} + +// NewLifecycleNotificationConfigFromEnv loads lifecycle webhook settings. +func NewLifecycleNotificationConfigFromEnv() LifecycleNotificationConfig { + return LifecycleNotificationConfig{ + URL: strings.TrimSpace(os.Getenv("SPRITZ_LIFECYCLE_NOTIFY_URL")), + AuthToken: strings.TrimSpace(os.Getenv("SPRITZ_LIFECYCLE_NOTIFY_AUTH_TOKEN")), + Timeout: parseDurationEnv( + "SPRITZ_LIFECYCLE_NOTIFY_TIMEOUT", + defaultLifecycleNotificationTimeout, + ), + } +} + +func (c LifecycleNotificationConfig) enabled() bool { + return strings.TrimSpace(c.URL) != "" +} + +func (c LifecycleNotificationConfig) httpClient() *http.Client { + if c.Client != nil { + return c.Client + } + return &http.Client{Timeout: c.Timeout} +} + +func (c LifecycleNotificationConfig) notifyPhase( + ctx context.Context, + namespace, instanceID, phase string, +) error { + if !c.enabled() { + return nil + } + + payload, err := json.Marshal( + lifecycleNotificationPayload{ + Namespace: strings.TrimSpace(namespace), + InstanceID: strings.TrimSpace(instanceID), + Phase: strings.TrimSpace(phase), + }, + ) + if err != nil { + return err + } + + request, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + c.URL, + bytes.NewReader(payload), + ) + if err != nil { + return err + } + request.Header.Set("Content-Type", "application/json") + if token := strings.TrimSpace(c.AuthToken); token != "" { + request.Header.Set("Authorization", "Bearer "+token) + } + + response, err := c.httpClient().Do(request) + if err != nil { + return err + } + defer response.Body.Close() + + if response.StatusCode >= http.StatusOK && response.StatusCode < http.StatusMultipleChoices { + return nil + } + + body, _ := io.ReadAll(io.LimitReader(response.Body, 512)) + return fmt.Errorf( + "lifecycle notification failed: %s %s", + response.Status, + strings.TrimSpace(string(body)), + ) +} diff --git a/operator/controllers/lifecycle_notifications_test.go b/operator/controllers/lifecycle_notifications_test.go new file mode 100644 index 0000000..b8d973d --- /dev/null +++ b/operator/controllers/lifecycle_notifications_test.go @@ -0,0 +1,67 @@ +package controllers + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestNewLifecycleNotificationConfigFromEnv(t *testing.T) { + t.Setenv("SPRITZ_LIFECYCLE_NOTIFY_URL", "https://notify.example.test/runtime") + t.Setenv("SPRITZ_LIFECYCLE_NOTIFY_AUTH_TOKEN", "notify-token") + t.Setenv("SPRITZ_LIFECYCLE_NOTIFY_TIMEOUT", "7s") + + cfg := NewLifecycleNotificationConfigFromEnv() + + if cfg.URL != "https://notify.example.test/runtime" { + t.Fatalf("expected lifecycle notify url to be loaded, got %q", cfg.URL) + } + if cfg.AuthToken != "notify-token" { + t.Fatalf("expected lifecycle notify auth token, got %q", cfg.AuthToken) + } + if cfg.Timeout != 7*time.Second { + t.Fatalf("expected lifecycle notify timeout to be 7s, got %s", cfg.Timeout) + } +} + +func TestLifecycleNotificationConfigNotifyPhasePostsExpectedPayload(t *testing.T) { + var received struct { + Authorization string + Payload map[string]any + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + received.Authorization = r.Header.Get("Authorization") + if err := json.NewDecoder(r.Body).Decode(&received.Payload); err != nil { + t.Fatalf("decode notification payload: %v", err) + } + w.WriteHeader(http.StatusAccepted) + })) + defer server.Close() + + cfg := LifecycleNotificationConfig{ + URL: server.URL, + AuthToken: "notify-token", + Timeout: time.Second, + Client: server.Client(), + } + + if err := cfg.notifyPhase(context.Background(), "spritz-system", "zeno-acme", "Expired"); err != nil { + t.Fatalf("notifyPhase returned error: %v", err) + } + + if received.Authorization != "Bearer notify-token" { + t.Fatalf("expected bearer auth header, got %q", received.Authorization) + } + if received.Payload["namespace"] != "spritz-system" { + t.Fatalf("expected namespace payload, got %#v", received.Payload["namespace"]) + } + if received.Payload["instanceId"] != "zeno-acme" { + t.Fatalf("expected instanceId payload, got %#v", received.Payload["instanceId"]) + } + if received.Payload["phase"] != "Expired" { + t.Fatalf("expected phase payload, got %#v", received.Payload["phase"]) + } +} diff --git a/operator/controllers/spritz_controller.go b/operator/controllers/spritz_controller.go index 2cee0d9..0579f28 100644 --- a/operator/controllers/spritz_controller.go +++ b/operator/controllers/spritz_controller.go @@ -29,19 +29,20 @@ import ( ) const ( - defaultRepoDir = "/workspace/repo" - defaultWebPort = int32(8080) - defaultSSHPort = int32(22) - defaultSSHUser = "spritz" - defaultSSHMode = "service" - spritzContainerName = "spritz" - spritzFinalizer = "spritz.sh/finalizer" - ownerLabelKey = "spritz.sh/owner" - defaultTTLGrace = 5 * time.Minute - defaultRepoInitImage = "alpine/git:2.45.2" - repoAuthMountPath = "/var/run/spritz/repo-auth" - repoInitHomeDir = "/home/dev" - repoInitGroupID int64 = 65532 + defaultRepoDir = "/workspace/repo" + defaultWebPort = int32(8080) + defaultSSHPort = int32(22) + defaultSSHUser = "spritz" + defaultSSHMode = "service" + spritzContainerName = "spritz" + spritzFinalizer = "spritz.sh/finalizer" + ownerLabelKey = "spritz.sh/owner" + defaultTTLGrace = 5 * time.Minute + defaultRepoInitImage = "alpine/git:2.45.2" + repoAuthMountPath = "/var/run/spritz/repo-auth" + repoInitHomeDir = "/home/dev" + repoInitGroupID int64 = 65532 + lifecycleNotifiedPhaseAnnotationKey = "spritz.sh/lifecycle-notified-phase" ) var ( @@ -51,8 +52,9 @@ var ( type SpritzReconciler struct { client.Client - Scheme *runtime.Scheme - ACP ACPProbeConfig + Scheme *runtime.Scheme + ACP ACPProbeConfig + LifecycleNotifications LifecycleNotificationConfig } type repoEntry struct { @@ -703,6 +705,8 @@ func (r *SpritzReconciler) reconcileStatus(ctx context.Context, spritz *spritzv1 } func (r *SpritzReconciler) setStatus(ctx context.Context, spritz *spritzv1.Spritz, phase, url string, sshInfo *spritzv1.SpritzSSHInfo, reason, message string, acpStatus *spritzv1.SpritzACPStatus) error { + phase = strings.TrimSpace(phase) + notificationPending := phase != "" && lastLifecycleNotifiedPhase(spritz) != phase conditionStatus := metav1.ConditionFalse if phase == "Ready" { conditionStatus = metav1.ConditionTrue @@ -729,7 +733,57 @@ func (r *SpritzReconciler) setStatus(ctx context.Context, spritz *spritzv1.Sprit spritz.Status.ReadyAt = &now } - return r.Status().Update(ctx, spritz) + if err := r.Status().Update(ctx, spritz); err != nil { + return err + } + if !notificationPending || !r.LifecycleNotifications.enabled() { + return nil + } + if err := r.LifecycleNotifications.notifyPhase(ctx, spritz.Namespace, spritz.Name, phase); err != nil { + log.FromContext(ctx).Error( + err, + "lifecycle notification failed", + "name", spritz.Name, + "namespace", spritz.Namespace, + "phase", phase, + ) + return nil + } + if err := r.recordLifecycleNotifiedPhase(ctx, spritz, phase); err != nil { + log.FromContext(ctx).Error( + err, + "failed to persist lifecycle notification marker", + "name", spritz.Name, + "namespace", spritz.Namespace, + "phase", phase, + ) + } + return nil +} + +func lastLifecycleNotifiedPhase(spritz *spritzv1.Spritz) string { + if spritz == nil || spritz.Annotations == nil { + return "" + } + return strings.TrimSpace(spritz.Annotations[lifecycleNotifiedPhaseAnnotationKey]) +} + +// recordLifecycleNotifiedPhase stores the latest phase that was delivered to +// the optional lifecycle webhook so later reconciles can retry only when needed. +func (r *SpritzReconciler) recordLifecycleNotifiedPhase( + ctx context.Context, + spritz *spritzv1.Spritz, + phase string, +) error { + if spritz == nil || strings.TrimSpace(phase) == "" { + return nil + } + base := spritz.DeepCopy() + if spritz.Annotations == nil { + spritz.Annotations = map[string]string{} + } + spritz.Annotations[lifecycleNotifiedPhaseAnnotationKey] = phase + return r.Patch(ctx, spritz, client.MergeFrom(base)) } func setACPReadyCondition(conditions *[]metav1.Condition, generation int64, status *spritzv1.SpritzACPStatus) { diff --git a/operator/controllers/status_notifications_test.go b/operator/controllers/status_notifications_test.go new file mode 100644 index 0000000..f7fac1c --- /dev/null +++ b/operator/controllers/status_notifications_test.go @@ -0,0 +1,208 @@ +package controllers + +import ( + "context" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + spritzv1 "spritz.sh/operator/api/v1" +) + +func TestSetStatusDoesNotBlockOnLifecycleNotificationFailure(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "notify unavailable", http.StatusBadGateway) + })) + defer server.Close() + + scheme := newControllerTestScheme(t) + spritz := &spritzv1.Spritz{ + ObjectMeta: metav1.ObjectMeta{Name: "steady-otter", Namespace: "spritz-test"}, + Spec: spritzv1.SpritzSpec{ + Image: "example.com/openclaw:latest", + Owner: spritzv1.SpritzOwner{ID: "user-1"}, + }, + Status: spritzv1.SpritzStatus{Phase: "Provisioning"}, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&spritzv1.Spritz{}). + WithObjects(spritz). + Build() + reconciler := &SpritzReconciler{ + Client: k8sClient, + Scheme: scheme, + LifecycleNotifications: LifecycleNotificationConfig{ + URL: server.URL, + Timeout: time.Second, + Client: server.Client(), + }, + } + + if err := reconciler.setStatus( + context.Background(), + spritz, + "Ready", + "https://spritz.example.test", + nil, + "Ready", + "spritz ready", + nil, + ); err != nil { + t.Fatalf("setStatus returned error: %v", err) + } + + stored := &spritzv1.Spritz{} + if err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(spritz), stored); err != nil { + t.Fatalf("failed to load updated spritz: %v", err) + } + if stored.Status.Phase != "Ready" { + t.Fatalf("expected phase to be updated despite notification failure, got %q", stored.Status.Phase) + } +} + +func TestSetStatusNotifiesAfterPersistedPhaseUpdate(t *testing.T) { + scheme := newControllerTestScheme(t) + spritz := &spritzv1.Spritz{ + ObjectMeta: metav1.ObjectMeta{Name: "tidy-otter", Namespace: "spritz-test"}, + Spec: spritzv1.SpritzSpec{ + Image: "example.com/openclaw:latest", + Owner: spritzv1.SpritzOwner{ID: "user-1"}, + }, + Status: spritzv1.SpritzStatus{Phase: "Provisioning"}, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&spritzv1.Spritz{}). + WithObjects(spritz). + Build() + + var observedStoredPhase atomic.Value + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + stored := &spritzv1.Spritz{} + if err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(spritz), stored); err != nil { + t.Fatalf("failed to load persisted spritz during notification: %v", err) + } + observedStoredPhase.Store(stored.Status.Phase) + w.WriteHeader(http.StatusAccepted) + })) + defer server.Close() + + reconciler := &SpritzReconciler{ + Client: k8sClient, + Scheme: scheme, + LifecycleNotifications: LifecycleNotificationConfig{ + URL: server.URL, + Timeout: time.Second, + Client: server.Client(), + }, + } + + if err := reconciler.setStatus( + context.Background(), + spritz, + "Ready", + "https://spritz.example.test", + nil, + "Ready", + "spritz ready", + nil, + ); err != nil { + t.Fatalf("setStatus returned error: %v", err) + } + + if got, _ := observedStoredPhase.Load().(string); got != "Ready" { + t.Fatalf("expected notification to observe persisted Ready phase, got %q", got) + } +} + +func TestSetStatusRetriesLifecycleNotificationUntilRecorded(t *testing.T) { + scheme := newControllerTestScheme(t) + spritz := &spritzv1.Spritz{ + ObjectMeta: metav1.ObjectMeta{Name: "retry-otter", Namespace: "spritz-test"}, + Spec: spritzv1.SpritzSpec{ + Image: "example.com/openclaw:latest", + Owner: spritzv1.SpritzOwner{ID: "user-1"}, + }, + Status: spritzv1.SpritzStatus{Phase: "Provisioning"}, + } + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&spritzv1.Spritz{}). + WithObjects(spritz). + Build() + + var notificationCalls atomic.Int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if notificationCalls.Add(1) == 1 { + http.Error(w, "notify unavailable", http.StatusBadGateway) + return + } + w.WriteHeader(http.StatusAccepted) + })) + defer server.Close() + + reconciler := &SpritzReconciler{ + Client: k8sClient, + Scheme: scheme, + LifecycleNotifications: LifecycleNotificationConfig{ + URL: server.URL, + Timeout: time.Second, + Client: server.Client(), + }, + } + + if err := reconciler.setStatus( + context.Background(), + spritz, + "Ready", + "https://spritz.example.test", + nil, + "Ready", + "spritz ready", + nil, + ); err != nil { + t.Fatalf("initial setStatus returned error: %v", err) + } + + stored := &spritzv1.Spritz{} + if err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(spritz), stored); err != nil { + t.Fatalf("failed to load stored spritz after initial update: %v", err) + } + if stored.Status.Phase != "Ready" { + t.Fatalf("expected phase to persist as Ready after initial update, got %q", stored.Status.Phase) + } + if got := stored.Annotations[lifecycleNotifiedPhaseAnnotationKey]; got != "" { + t.Fatalf("expected failed notification not to record annotation, got %q", got) + } + + if err := reconciler.setStatus( + context.Background(), + stored, + "Ready", + "https://spritz.example.test", + nil, + "Ready", + "spritz ready", + nil, + ); err != nil { + t.Fatalf("retry setStatus returned error: %v", err) + } + + reloaded := &spritzv1.Spritz{} + if err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(spritz), reloaded); err != nil { + t.Fatalf("failed to reload spritz after retry update: %v", err) + } + if notificationCalls.Load() != 2 { + t.Fatalf("expected notification retry on second reconcile, got %d calls", notificationCalls.Load()) + } + if got := reloaded.Annotations[lifecycleNotifiedPhaseAnnotationKey]; got != "Ready" { + t.Fatalf("expected successful retry to record notified phase, got %q", got) + } +} diff --git a/operator/main.go b/operator/main.go index 24097be..a981886 100644 --- a/operator/main.go +++ b/operator/main.go @@ -63,9 +63,10 @@ func main() { } if err := (&controllers.SpritzReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ACP: controllers.NewACPProbeConfigFromEnv(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ACP: controllers.NewACPProbeConfigFromEnv(), + LifecycleNotifications: controllers.NewLifecycleNotificationConfigFromEnv(), }).SetupWithManager(mgr); err != nil { logger.Error(err, "unable to create controller") os.Exit(1)