diff --git a/README.md b/README.md index da94ec0..8f6e2d9 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,8 @@ Spritz is intended to remain portable and standalone: - [ACP Port and Agent Chat Architecture](docs/2026-03-09-acp-port-and-agent-chat-architecture.md) - [External Provisioner and Service Principal Architecture](docs/2026-03-11-external-provisioner-and-service-principal-architecture.md) - [External Identity Resolution API Architecture](docs/2026-03-12-external-identity-resolution-api-architecture.md) +- [Privileged Conversation Debug and Test Architecture](docs/2026-03-24-privileged-conversation-debug-and-test-architecture.md) +- [Spritz-Native Conversation Broker Architecture](docs/2026-03-24-spritz-native-conversation-broker-architecture.md) - [Spawn Language for Agent Instances](docs/2026-03-13-spawn-language-for-agent-instances.md) - [OpenClaw Integration](docs/2026-03-13-openclaw-integration.md) - [Local kind Development Guide](docs/2026-03-14-local-kind-development-guide.md) diff --git a/api/acp_bootstrap.go b/api/acp_bootstrap.go index b2033ac..1914cd1 100644 --- a/api/acp_bootstrap.go +++ b/api/acp_bootstrap.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strings" + "sync" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" @@ -103,8 +104,12 @@ func (e *acpBootstrapRPCError) missingSession() bool { } type acpBootstrapInstanceClient struct { - conn *websocket.Conn - nextID int64 + conn *websocket.Conn + nextID int64 + writeMu sync.Mutex + readerOnce sync.Once + readCh chan *acpBootstrapJSONRPCMessage + readErrCh chan error } func (c *acpBootstrapInstanceClient) close() error { @@ -190,6 +195,8 @@ func (c *acpBootstrapInstanceClient) request(ctx context.Context, method string, } func (c *acpBootstrapInstanceClient) writeJSON(ctx context.Context, payload any) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() if deadline, ok := ctx.Deadline(); ok { if err := c.conn.SetWriteDeadline(deadline); err != nil { return err @@ -198,21 +205,90 @@ func (c *acpBootstrapInstanceClient) writeJSON(ctx context.Context, payload any) return c.conn.WriteJSON(payload) } +func (c *acpBootstrapInstanceClient) notify(ctx context.Context, method string, params any) error { + return c.writeJSON(ctx, map[string]any{ + "jsonrpc": "2.0", + "method": method, + "params": params, + }) +} + +func (c *acpBootstrapInstanceClient) cancelPrompt(ctx context.Context, sessionID string) error { + if strings.TrimSpace(sessionID) == "" { + return nil + } + return c.notify(ctx, "session/cancel", map[string]any{ + "sessionId": sessionID, + }) +} + +func (c *acpBootstrapInstanceClient) respondError(ctx context.Context, id any, code int, message string) error { + if id == nil { + return nil + } + return c.writeJSON(ctx, map[string]any{ + "jsonrpc": "2.0", + "id": id, + "error": map[string]any{ + "code": code, + "message": message, + }, + }) +} + +func (c *acpBootstrapInstanceClient) handleServerRequest(ctx context.Context, message *acpBootstrapJSONRPCMessage, unsupportedMessage string) (bool, error) { + if message == nil || strings.TrimSpace(message.Method) == "" || message.ID == nil { + return false, nil + } + if message.Method == "session/request_permission" { + return true, c.respondError(ctx, message.ID, -32000, "Permission requests are not supported by internal debug chat.") + } + return true, c.respondError(ctx, message.ID, -32601, unsupportedMessage) +} + func (c *acpBootstrapInstanceClient) readMessage(ctx context.Context) (*acpBootstrapJSONRPCMessage, error) { - if deadline, ok := ctx.Deadline(); ok { - if err := c.conn.SetReadDeadline(deadline); err != nil { + c.startReader() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case message, ok := <-c.readCh: + if !ok { + if err, ok := <-c.readErrCh; ok && err != nil { + return nil, err + } + return nil, websocket.ErrCloseSent + } + return message, nil + case err, ok := <-c.readErrCh: + if ok && err != nil { return nil, err } + return nil, websocket.ErrCloseSent } - _, payload, err := c.conn.ReadMessage() - if err != nil { - return nil, err - } - message := &acpBootstrapJSONRPCMessage{} - if err := json.Unmarshal(payload, message); err != nil { - return nil, err - } - return message, nil +} + +func (c *acpBootstrapInstanceClient) startReader() { + c.readerOnce.Do(func() { + c.readCh = make(chan *acpBootstrapJSONRPCMessage, 32) + c.readErrCh = make(chan error, 1) + go func() { + defer close(c.readCh) + defer close(c.readErrCh) + for { + _, payload, err := c.conn.ReadMessage() + if err != nil { + c.readErrCh <- err + return + } + message := &acpBootstrapJSONRPCMessage{} + if err := json.Unmarshal(payload, message); err != nil { + c.readErrCh <- err + return + } + c.readCh <- message + } + }() + }) } func newACPBootstrapRPCError(payload *acpBootstrapJSONRPCError) *acpBootstrapRPCError { @@ -344,8 +420,12 @@ func (s *server) bootstrapACPConversationBinding(ctx context.Context, conversati return nil, err } + return s.bootstrapACPConversationBindingWithClient(ctx, conversation, client, initResult) +} + +func (s *server) bootstrapACPConversationBindingWithClient(ctx context.Context, conversation *spritzv1.SpritzConversation, client *acpBootstrapInstanceClient, initResult *acpBootstrapInitializeResult) (*acpBootstrapResponse, error) { if !initResult.AgentCapabilities.LoadSession { - err = errors.New("agent does not support session/load") + err := errors.New("agent does not support session/load") s.recordConversationBindingError(ctx, conversation.Namespace, conversation.Name, "", err) return nil, err } @@ -358,6 +438,7 @@ func (s *server) bootstrapACPConversationBinding(ctx context.Context, conversati replaced := false loaded := false var replayMessageCount int32 + var err error if effectiveSessionID != "" { replayMessageCount, err = client.loadSession(ctx, effectiveSessionID, normalizeConversationCWD(conversation.Spec.CWD)) diff --git a/api/acp_config.go b/api/acp_config.go index d7e5c1e..d43f54a 100644 --- a/api/acp_config.go +++ b/api/acp_config.go @@ -28,6 +28,8 @@ type acpConfig struct { clientInfo acpBootstrapClientInfo clientCapabilities map[string]any bootstrapDialTimeout time.Duration + promptTimeout time.Duration + promptSettleTimeout time.Duration } func defaultACPClientCapabilities() map[string]any { @@ -65,6 +67,8 @@ func newACPConfig() acpConfig { }, clientCapabilities: defaultACPClientCapabilities(), bootstrapDialTimeout: parseDurationEnv("SPRITZ_ACP_BOOTSTRAP_DIAL_TIMEOUT", 5*time.Second), + promptTimeout: parseDurationEnv("SPRITZ_ACP_PROMPT_TIMEOUT", 90*time.Second), + promptSettleTimeout: parseDurationEnv("SPRITZ_ACP_PROMPT_SETTLE_TIMEOUT", 750*time.Millisecond), } } diff --git a/api/acp_prompt.go b/api/acp_prompt.go new file mode 100644 index 0000000..78c4529 --- /dev/null +++ b/api/acp_prompt.go @@ -0,0 +1,205 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" +) + +type acpPromptResult struct { + StopReason string `json:"stopReason,omitempty"` + AssistantText string `json:"assistantText,omitempty"` + Updates []map[string]any `json:"updates,omitempty"` +} + +func (c *acpBootstrapInstanceClient) prompt(ctx context.Context, sessionID, text string, settleTimeout time.Duration) (*acpPromptResult, error) { + c.nextID++ + requestID := fmt.Sprintf("prompt-%d", c.nextID) + if err := c.writeJSON(ctx, map[string]any{ + "jsonrpc": "2.0", + "id": requestID, + "method": "session/prompt", + "params": map[string]any{ + "sessionId": sessionID, + "prompt": []map[string]any{{ + "type": "text", + "text": text, + }}, + }, + }); err != nil { + return nil, err + } + + var result struct { + StopReason string `json:"stopReason"` + } + updates := make([]map[string]any, 0, 8) + + for { + message, err := c.readMessage(ctx) + if err != nil { + return nil, err + } + if handled, err := c.handleServerRequest(ctx, message, "Method not supported by internal debug chat."); handled { + if err != nil { + return nil, err + } + continue + } + if update, ok := sessionUpdateFromMessage(message); ok { + updates = append(updates, update) + continue + } + if fmt.Sprint(message.ID) != requestID { + continue + } + if message.Error != nil { + return nil, newACPBootstrapRPCError(message.Error) + } + if len(message.Result) > 0 { + if err := json.Unmarshal(message.Result, &result); err != nil { + return nil, err + } + } + break + } + + if err := c.drainSessionUpdates(ctx, settleTimeout, &updates); err != nil { + return nil, err + } + + return &acpPromptResult{ + StopReason: strings.TrimSpace(result.StopReason), + AssistantText: assistantTextFromACPUpdates(updates), + Updates: updates, + }, nil +} + +func (c *acpBootstrapInstanceClient) drainSessionUpdates(ctx context.Context, settleTimeout time.Duration, updates *[]map[string]any) error { + if c == nil || c.conn == nil || settleTimeout <= 0 { + return nil + } + c.startReader() + timer := time.NewTimer(settleTimeout) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + case err, ok := <-c.readErrCh: + if !ok || err == nil { + return nil + } + return err + case message, ok := <-c.readCh: + if !ok { + return nil + } + if handled, err := c.handleServerRequest(ctx, message, "Method not supported by internal debug chat."); handled { + if err != nil { + return err + } + continue + } + if update, ok := sessionUpdateFromMessage(message); ok { + *updates = append(*updates, update) + if !shouldExtendSessionSettleWindow(update) { + continue + } + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(settleTimeout) + } + } +} + +func shouldExtendSessionSettleWindow(update map[string]any) bool { + switch strings.TrimSpace(fmt.Sprint(update["sessionUpdate"])) { + case "", "heartbeat", "ping", "pong", "ack", "available_commands_update", "current_mode_update", "usage_update", "session_info_update": + return false + default: + return true + } +} + +func sessionUpdateFromMessage(message *acpBootstrapJSONRPCMessage) (map[string]any, bool) { + if message == nil || message.Method != "session/update" || len(message.Params) == 0 { + return nil, false + } + var params struct { + Update map[string]any `json:"update"` + } + if err := json.Unmarshal(message.Params, ¶ms); err != nil { + return nil, false + } + if len(params.Update) == 0 { + return nil, false + } + return params.Update, true +} + +func assistantTextFromACPUpdates(updates []map[string]any) string { + chunks := make([]any, 0, len(updates)) + for _, update := range updates { + if strings.TrimSpace(fmt.Sprint(update["sessionUpdate"])) != "agent_message_chunk" { + continue + } + chunks = append(chunks, update["content"]) + } + return joinACPTextChunks(chunks) +} + +func joinACPTextChunks(values []any) string { + var builder strings.Builder + for _, value := range values { + builder.WriteString(extractACPText(value)) + } + return builder.String() +} + +func extractACPText(value any) string { + switch typed := value.(type) { + case nil: + return "" + case string: + return typed + case []any: + parts := make([]string, 0, len(typed)) + for _, item := range typed { + text := extractACPText(item) + if text == "" { + continue + } + parts = append(parts, text) + } + return strings.Join(parts, "\n") + case map[string]any: + if text, ok := typed["text"].(string); ok { + return text + } + if content, ok := typed["content"]; ok { + return extractACPText(content) + } + if resource, ok := typed["resource"].(map[string]any); ok { + if text, ok := resource["text"].(string); ok { + return text + } + if uri, ok := resource["uri"].(string); ok { + return uri + } + } + return "" + default: + return fmt.Sprint(typed) + } +} diff --git a/api/acp_test.go b/api/acp_test.go index 0e675dd..ba0eaaf 100644 --- a/api/acp_test.go +++ b/api/acp_test.go @@ -56,6 +56,8 @@ func newACPTestServer(t *testing.T, objects ...client.Object) *server { path: spritzv1.DefaultACPPath, clientCapabilities: defaultACPClientCapabilities(), bootstrapDialTimeout: 5 * time.Second, + promptTimeout: 30 * time.Second, + promptSettleTimeout: 10 * time.Millisecond, }, } } diff --git a/api/authorization.go b/api/authorization.go index f7d5648..0e48b0e 100644 --- a/api/authorization.go +++ b/api/authorization.go @@ -30,6 +30,35 @@ func authorizeHumanOwnedAccess(principal principal, ownerID string, enabled bool return nil } +func authorizeExactOwnerAccess(principal principal, ownerID string, enabled bool) error { + if !enabled { + return nil + } + if principal.isAdminPrincipal() { + return nil + } + if stringsTrim(principal.ID) == "" { + return errUnauthenticated + } + if stringsTrim(ownerID) == "" || stringsTrim(principal.ID) != stringsTrim(ownerID) { + return errForbidden + } + return nil +} + +func authorizeCallerOwnerAccess(principal principal, ownerID string, enabled bool) error { + if !enabled { + return nil + } + if stringsTrim(principal.ID) == "" { + return errUnauthenticated + } + if stringsTrim(ownerID) == "" || stringsTrim(principal.ID) != stringsTrim(ownerID) { + return errForbidden + } + return nil +} + func authorizeHumanOnly(principal principal, enabled bool) error { if !enabled { return nil diff --git a/api/internal_auth.go b/api/internal_auth.go index c6b8172..f00089a 100644 --- a/api/internal_auth.go +++ b/api/internal_auth.go @@ -13,12 +13,22 @@ type internalAuthConfig struct { token string } +const internalTokenHeader = "X-Spritz-Internal-Token" + func newInternalAuthConfig() internalAuthConfig { token := strings.TrimSpace(os.Getenv("SPRITZ_INTERNAL_TOKEN")) return internalAuthConfig{enabled: token != "", token: token} } func (s *server) internalAuthMiddleware() echo.MiddlewareFunc { + return s.internalAuthMiddlewareWithBearerFallback(true) +} + +func (s *server) internalAuthHeaderMiddleware() echo.MiddlewareFunc { + return s.internalAuthMiddlewareWithBearerFallback(false) +} + +func (s *server) internalAuthMiddlewareWithBearerFallback(allowBearerFallback bool) echo.MiddlewareFunc { if !s.internalAuth.enabled { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { @@ -28,11 +38,13 @@ func (s *server) internalAuthMiddleware() echo.MiddlewareFunc { } return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { - value := c.Request().Header.Get("Authorization") - if !strings.HasPrefix(value, "Bearer ") { - return writeError(c, http.StatusUnauthorized, "unauthorized") + token := strings.TrimSpace(c.Request().Header.Get(internalTokenHeader)) + if token == "" && allowBearerFallback { + value := c.Request().Header.Get("Authorization") + if strings.HasPrefix(value, "Bearer ") { + token = strings.TrimSpace(strings.TrimPrefix(value, "Bearer ")) + } } - token := strings.TrimSpace(strings.TrimPrefix(value, "Bearer ")) if token == "" || token != s.internalAuth.token { return writeError(c, http.StatusUnauthorized, "unauthorized") } diff --git a/api/internal_debug_chat.go b/api/internal_debug_chat.go new file mode 100644 index 0000000..2419c4e --- /dev/null +++ b/api/internal_debug_chat.go @@ -0,0 +1,352 @@ +package main + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "log" + "net/http" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/labstack/echo/v4" + apierrors "k8s.io/apimachinery/pkg/api/errors" + + spritzv1 "spritz.sh/operator/api/v1" +) + +type internalDebugChatSendRequest struct { + Target internalDebugChatTarget `json:"target"` + Reason string `json:"reason,omitempty"` + Message string `json:"message"` +} + +type internalDebugChatTarget struct { + Namespace string `json:"namespace,omitempty"` + SpritzName string `json:"spritzName,omitempty"` + ConversationID string `json:"conversationId,omitempty"` + Title string `json:"title,omitempty"` + CWD string `json:"cwd,omitempty"` +} + +type internalDebugChatSendResponse struct { + Conversation *spritzv1.SpritzConversation `json:"conversation"` + EffectiveSessionID string `json:"effectiveSessionId,omitempty"` + BindingState string `json:"bindingState,omitempty"` + Loaded bool `json:"loaded,omitempty"` + Replaced bool `json:"replaced,omitempty"` + ReplayMessageCount int32 `json:"replayMessageCount,omitempty"` + StopReason string `json:"stopReason,omitempty"` + AssistantText string `json:"assistantText,omitempty"` + Updates []map[string]any `json:"updates,omitempty"` + CreatedConversation bool `json:"createdConversation,omitempty"` +} + +func (t internalDebugChatTarget) validate() error { + hasSpritz := strings.TrimSpace(t.SpritzName) != "" + hasConversation := strings.TrimSpace(t.ConversationID) != "" + switch { + case hasSpritz == hasConversation: + return errors.New("target must include exactly one of spritzName or conversationId") + default: + return nil + } +} + +func (s *server) sendInternalDebugChat(c echo.Context) error { + if !s.acp.enabled { + return writeError(c, http.StatusNotFound, "acp disabled") + } + + var body internalDebugChatSendRequest + if err := c.Bind(&body); err != nil { + return writeError(c, http.StatusBadRequest, "invalid json") + } + principal, ok := principalFromContext(c) + if s.auth.enabled() && (!ok || principal.ID == "") { + return writeError(c, http.StatusUnauthorized, "unauthenticated") + } + if err := body.Target.validate(); err != nil { + s.auditInternalDebugChatFailure(principal.ID, body.Target, strings.TrimSpace(body.Reason), body.Message, "invalid_target", err) + return writeError(c, http.StatusBadRequest, err.Error()) + } + if strings.TrimSpace(body.Message) == "" { + s.auditInternalDebugChatFailure(principal.ID, body.Target, strings.TrimSpace(body.Reason), body.Message, "invalid_message", errors.New("message is required")) + return writeError(c, http.StatusBadRequest, "message is required") + } + message := body.Message + reason := strings.TrimSpace(body.Reason) + if reason == "" { + reason = "spz chat send" + } + + conversation, spritz, createdConversation, err := s.resolveInternalDebugChatTarget(c.Request().Context(), principal, body.Target) + if err != nil { + s.auditInternalDebugChatFailure(principal.ID, body.Target, reason, message, "target_error", err) + return s.writeInternalDebugChatTargetError(c, err) + } + + bootstrap, promptResult, err := s.runInternalDebugChat(c.Request().Context(), conversation, spritz, message) + if err != nil { + s.cleanupInternalDebugConversation(c.Request().Context(), conversation, createdConversation) + outcome := "prompt_error" + if promptResult == nil { + outcome = "bootstrap_error" + } + s.auditInternalDebugChatFailure(principal.ID, body.Target, reason, message, outcome, err) + return s.writeInternalDebugChatRuntimeError(c, err) + } + + s.auditInternalDebugChat(principal.ID, conversation, reason, message, promptResult) + + return writeJSON(c, http.StatusOK, internalDebugChatSendResponse{ + Conversation: bootstrap.Conversation, + EffectiveSessionID: bootstrap.EffectiveSessionID, + BindingState: bootstrap.BindingState, + Loaded: bootstrap.Loaded, + Replaced: bootstrap.Replaced, + ReplayMessageCount: bootstrap.ReplayMessageCount, + StopReason: promptResult.StopReason, + AssistantText: promptResult.AssistantText, + Updates: promptResult.Updates, + CreatedConversation: createdConversation, + }) +} + +func (s *server) resolveInternalDebugChatTarget(ctx context.Context, principal principal, target internalDebugChatTarget) (*spritzv1.SpritzConversation, *spritzv1.Spritz, bool, error) { + namespace, err := s.resolveSpritzNamespace(strings.TrimSpace(target.Namespace)) + if err != nil { + return nil, nil, false, errForbidden + } + if namespace == "" { + namespace = "default" + } + + if conversationID := strings.TrimSpace(target.ConversationID); conversationID != "" { + conversation, err := s.getInternalDebugConversation(ctx, principal, namespace, conversationID) + if err != nil { + return nil, nil, false, err + } + spritz, err := s.getInternalDebugACPReadySpritz(ctx, principal, namespace, conversation.Spec.SpritzName) + if err != nil { + return nil, nil, false, err + } + return conversation, spritz, false, nil + } + + spritz, err := s.getInternalDebugACPReadySpritz(ctx, principal, namespace, strings.TrimSpace(target.SpritzName)) + if err != nil { + return nil, nil, false, err + } + conversation, err := buildACPConversationResource(spritz, target.Title, target.CWD) + if err != nil { + return nil, nil, false, err + } + for attempt := 0; attempt < 3; attempt++ { + if err := s.client.Create(ctx, conversation); err == nil { + return conversation, spritz, true, nil + } else if !apierrors.IsAlreadyExists(err) { + return nil, nil, false, err + } + conversation.Name, err = newConversationName(spritz.Name) + if err != nil { + return nil, nil, false, err + } + } + return nil, nil, false, errors.New("failed to allocate conversation id") +} + +func (s *server) getInternalDebugConversation(ctx context.Context, principal principal, namespace, conversationID string) (*spritzv1.SpritzConversation, error) { + conversation := &spritzv1.SpritzConversation{} + if err := s.client.Get(ctx, clientKey(namespace, conversationID), conversation); err != nil { + return nil, err + } + if err := authorizeCallerOwnerAccess(principal, conversation.Spec.Owner.ID, s.auth.enabled()); err != nil { + return nil, err + } + return conversation, nil +} + +func (s *server) getInternalDebugACPReadySpritz(ctx context.Context, principal principal, namespace, name string) (*spritzv1.Spritz, error) { + spritz := &spritzv1.Spritz{} + if err := s.client.Get(ctx, clientKey(namespace, name), spritz); err != nil { + return nil, err + } + if err := authorizeCallerOwnerAccess(principal, spritz.Spec.Owner.ID, s.auth.enabled()); err != nil { + return nil, err + } + if !spritzSupportsACPConversations(spritz) { + return nil, errACPUnavailable + } + return spritz, nil +} + +func (s *server) cleanupInternalDebugConversation(ctx context.Context, conversation *spritzv1.SpritzConversation, created bool) { + if !created || conversation == nil { + return + } + cleanupCtx := ctx + cleanupCancel := func() {} + if cleanupCtx == nil || cleanupCtx.Err() != nil { + cleanupCtx, cleanupCancel = context.WithTimeout(context.Background(), 5*time.Second) + } + defer cleanupCancel() + if err := s.client.Delete(cleanupCtx, conversation); err != nil && !apierrors.IsNotFound(err) { + log.Printf( + "spritz internal-debug-chat cleanup_failed namespace=%s conversation_id=%s err=%v", + conversation.Namespace, + conversation.Name, + err, + ) + } +} + +func (s *server) runInternalDebugChat(ctx context.Context, conversation *spritzv1.SpritzConversation, spritz *spritzv1.Spritz, message string) (*acpBootstrapResponse, *acpPromptResult, error) { + bootstrapCtx, bootstrapCancel := context.WithTimeout(ctx, s.acp.promptTimeout) + defer bootstrapCancel() + + dialCtx, dialCancel := context.WithTimeout(bootstrapCtx, s.acp.bootstrapDialTimeout) + defer dialCancel() + + instanceConn, _, err := websocket.DefaultDialer.DialContext(dialCtx, s.acpInstanceURL(spritz.Namespace, spritz.Name), nil) + if err != nil { + s.recordConversationBindingError(bootstrapCtx, conversation.Namespace, conversation.Name, "", err) + return nil, nil, err + } + client := &acpBootstrapInstanceClient{conn: instanceConn} + defer func() { + _ = client.close() + }() + + initResult, err := client.initialize(bootstrapCtx, s.acp.clientInfo, s.acp.clientCapabilities) + if err != nil { + s.recordConversationBindingError(bootstrapCtx, conversation.Namespace, conversation.Name, "", err) + return nil, nil, err + } + + bootstrap, err := s.bootstrapACPConversationBindingWithClient(bootstrapCtx, conversation, client, initResult) + if err != nil { + return nil, nil, err + } + + if bootstrap.Loaded { + ignoredReplayUpdates := make([]map[string]any, 0, 8) + if err := client.drainSessionUpdates(bootstrapCtx, s.acp.promptSettleTimeout, &ignoredReplayUpdates); err != nil { + return bootstrap, nil, err + } + } + + promptCtx, cancel := context.WithTimeout(ctx, s.acp.promptTimeout) + defer cancel() + cancelWatcherDone := make(chan struct{}) + defer close(cancelWatcherDone) + var cancelOnce sync.Once + sendCancel := func() { + cancelOnce.Do(func() { + cancelCtx, cancelPrompt := context.WithTimeout(context.Background(), 5*time.Second) + _ = s.cancelInternalDebugChatPrompt(cancelCtx, spritz, bootstrap.EffectiveSessionID) + cancelPrompt() + }) + } + go func() { + select { + case <-promptCtx.Done(): + select { + case <-cancelWatcherDone: + return + default: + } + sendCancel() + case <-cancelWatcherDone: + } + }() + + result, err := client.prompt(promptCtx, bootstrap.EffectiveSessionID, message, s.acp.promptSettleTimeout) + if err != nil && promptCtx.Err() != nil { + sendCancel() + } + return bootstrap, result, err +} + +func (s *server) cancelInternalDebugChatPrompt(ctx context.Context, spritz *spritzv1.Spritz, sessionID string) error { + if spritz == nil || strings.TrimSpace(sessionID) == "" { + return nil + } + dialCtx, cancel := context.WithTimeout(ctx, s.acp.bootstrapDialTimeout) + defer cancel() + + instanceConn, _, err := websocket.DefaultDialer.DialContext(dialCtx, s.acpInstanceURL(spritz.Namespace, spritz.Name), nil) + if err != nil { + return err + } + client := &acpBootstrapInstanceClient{conn: instanceConn} + defer func() { + _ = client.close() + }() + if _, err := client.initialize(ctx, s.acp.clientInfo, s.acp.clientCapabilities); err != nil { + return err + } + return client.cancelPrompt(ctx, sessionID) +} + +func (s *server) auditInternalDebugChat(actorID string, conversation *spritzv1.SpritzConversation, reason, message string, result *acpPromptResult) { + if conversation == nil || result == nil { + return + } + promptHash := sha256.Sum256([]byte(message)) + assistantHash := sha256.Sum256([]byte(result.AssistantText)) + log.Printf( + "spritz internal-debug-chat actor_id=%s owner_id=%s namespace=%s conversation_id=%s spritz_name=%s reason=%q stop_reason=%s updates=%d prompt_sha256=%s response_sha256=%s", + actorID, + conversation.Spec.Owner.ID, + conversation.Namespace, + conversation.Name, + conversation.Spec.SpritzName, + reason, + result.StopReason, + len(result.Updates), + hex.EncodeToString(promptHash[:]), + hex.EncodeToString(assistantHash[:]), + ) +} + +func (s *server) writeInternalDebugChatTargetError(c echo.Context, err error) error { + switch { + case apierrors.IsNotFound(err): + return writeError(c, http.StatusNotFound, "target not found") + case errors.Is(err, errForbidden): + return writeError(c, http.StatusForbidden, "forbidden") + case errors.Is(err, errACPUnavailable): + return writeError(c, http.StatusConflict, "acp unavailable") + default: + return writeError(c, http.StatusInternalServerError, err.Error()) + } +} + +func (s *server) auditInternalDebugChatFailure(actorID string, target internalDebugChatTarget, reason, message, outcome string, cause error) { + promptHash := sha256.Sum256([]byte(message)) + log.Printf( + "spritz internal-debug-chat actor_id=%s namespace=%s spritz_name=%s conversation_id=%s reason=%q outcome=%s prompt_sha256=%s err=%v", + strings.TrimSpace(actorID), + strings.TrimSpace(target.Namespace), + strings.TrimSpace(target.SpritzName), + strings.TrimSpace(target.ConversationID), + strings.TrimSpace(reason), + strings.TrimSpace(outcome), + hex.EncodeToString(promptHash[:]), + cause, + ) +} + +func (s *server) writeInternalDebugChatRuntimeError(c echo.Context, err error) error { + var rpcErr *acpBootstrapRPCError + switch { + case errors.As(err, &rpcErr): + return writeError(c, http.StatusBadGateway, rpcErr.Error()) + default: + return writeError(c, http.StatusBadGateway, err.Error()) + } +} diff --git a/api/internal_debug_chat_test.go b/api/internal_debug_chat_test.go new file mode 100644 index 0000000..f4686ae --- /dev/null +++ b/api/internal_debug_chat_test.go @@ -0,0 +1,852 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/labstack/echo/v4" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + spritzv1 "spritz.sh/operator/api/v1" +) + +type fakeACPDebugChatServerOptions struct { + SessionID string + LoadReplay []map[string]any + LoadReplayAfter []map[string]any + LoadError *acpBootstrapJSONRPCError + HoldPrompt bool + PermissionRequestID any + PromptChunks []string + PromptChunksAfter []string + PromptUpdatesAfter []map[string]any + PromptChunkDelay time.Duration + PromptError *acpBootstrapJSONRPCError + StopReason string +} + +type fakeACPDebugChatServer struct { + url string + + server *httptest.Server + mu sync.Mutex + + initCalls int + newCalls int + cancelSessionIDs []string + loadSessionIDs []string + permissionResponses []map[string]any + promptSessionIDs []string + promptTexts []string +} + +func newFakeACPDebugChatServer(t *testing.T, options fakeACPDebugChatServerOptions) *fakeACPDebugChatServer { + t.Helper() + fakeServer := &fakeACPDebugChatServer{} + upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("failed to upgrade websocket: %v", err) + } + defer func() { + _ = conn.Close() + }() + + for { + _, payload, err := conn.ReadMessage() + if err != nil { + return + } + var message struct { + ID any `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + } + if err := json.Unmarshal(payload, &message); err != nil { + t.Fatalf("failed to decode ACP message: %v", err) + } + + switch message.Method { + case "initialize": + fakeServer.mu.Lock() + fakeServer.initCalls++ + fakeServer.mu.Unlock() + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "result": map[string]any{ + "protocolVersion": 1, + "agentCapabilities": map[string]any{ + "loadSession": true, + }, + "agentInfo": map[string]any{ + "name": "debug-agent", + "title": "Debug Agent", + "version": "1.0.0", + }, + }, + }); err != nil { + t.Fatalf("failed to write initialize result: %v", err) + } + case "session/new": + fakeServer.mu.Lock() + fakeServer.newCalls++ + fakeServer.mu.Unlock() + sessionID := strings.TrimSpace(options.SessionID) + if sessionID == "" { + sessionID = "session-fresh" + } + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "result": map[string]any{ + "sessionId": sessionID, + }, + }); err != nil { + t.Fatalf("failed to write new session result: %v", err) + } + case "session/load": + var params struct { + SessionID string `json:"sessionId"` + } + if err := json.Unmarshal(message.Params, ¶ms); err != nil { + t.Fatalf("failed to decode load params: %v", err) + } + fakeServer.mu.Lock() + fakeServer.loadSessionIDs = append(fakeServer.loadSessionIDs, params.SessionID) + fakeServer.mu.Unlock() + if options.LoadError != nil { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "error": options.LoadError, + }); err != nil { + t.Fatalf("failed to write load error: %v", err) + } + continue + } + for _, update := range options.LoadReplay { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": update, + }, + }); err != nil { + t.Fatalf("failed to write replay update: %v", err) + } + } + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "result": map[string]any{}, + }); err != nil { + t.Fatalf("failed to write load result: %v", err) + } + for _, update := range options.LoadReplayAfter { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": update, + }, + }); err != nil { + t.Fatalf("failed to write late replay update: %v", err) + } + } + case "session/prompt": + var params struct { + SessionID string `json:"sessionId"` + Prompt []struct { + Text string `json:"text"` + } `json:"prompt"` + } + if err := json.Unmarshal(message.Params, ¶ms); err != nil { + t.Fatalf("failed to decode prompt params: %v", err) + } + text := "" + if len(params.Prompt) > 0 { + text = params.Prompt[0].Text + } + fakeServer.mu.Lock() + fakeServer.promptSessionIDs = append(fakeServer.promptSessionIDs, params.SessionID) + fakeServer.promptTexts = append(fakeServer.promptTexts, text) + fakeServer.mu.Unlock() + if options.PermissionRequestID != nil { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": options.PermissionRequestID, + "method": "session/request_permission", + "params": map[string]any{ + "tool": "dangerous-tool", + }, + }); err != nil { + t.Fatalf("failed to write permission request: %v", err) + } + _, payload, err := conn.ReadMessage() + if err != nil { + t.Fatalf("failed to read permission response: %v", err) + } + var response map[string]any + if err := json.Unmarshal(payload, &response); err != nil { + t.Fatalf("failed to decode permission response: %v", err) + } + fakeServer.mu.Lock() + fakeServer.permissionResponses = append(fakeServer.permissionResponses, response) + fakeServer.mu.Unlock() + } + if options.HoldPrompt { + continue + } + if options.PromptError != nil { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "error": options.PromptError, + }); err != nil { + t.Fatalf("failed to write prompt error: %v", err) + } + continue + } + for _, chunk := range options.PromptChunks { + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": map[string]any{ + "type": "text", + "text": chunk, + }, + }, + }, + }); err != nil { + t.Fatalf("failed to write prompt update: %v", err) + } + } + stopReason := strings.TrimSpace(options.StopReason) + if stopReason == "" { + stopReason = "end_turn" + } + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "id": message.ID, + "result": map[string]any{ + "stopReason": stopReason, + }, + }); err != nil { + t.Fatalf("failed to write prompt result: %v", err) + } + if options.PromptChunkDelay > 0 { + time.Sleep(options.PromptChunkDelay) + } + for _, chunk := range options.PromptChunksAfter { + if options.PromptChunkDelay > 0 { + time.Sleep(options.PromptChunkDelay) + } + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": map[string]any{ + "sessionUpdate": "agent_message_chunk", + "content": map[string]any{ + "type": "text", + "text": chunk, + }, + }, + }, + }); err != nil { + return + } + } + for _, update := range options.PromptUpdatesAfter { + if options.PromptChunkDelay > 0 { + time.Sleep(options.PromptChunkDelay) + } + if err := conn.WriteJSON(map[string]any{ + "jsonrpc": "2.0", + "method": "session/update", + "params": map[string]any{ + "update": update, + }, + }); err != nil { + return + } + } + case "session/cancel": + var params struct { + SessionID string `json:"sessionId"` + } + if err := json.Unmarshal(message.Params, ¶ms); err != nil { + t.Fatalf("failed to decode cancel params: %v", err) + } + fakeServer.mu.Lock() + fakeServer.cancelSessionIDs = append(fakeServer.cancelSessionIDs, params.SessionID) + fakeServer.mu.Unlock() + default: + t.Fatalf("unexpected ACP method %q", message.Method) + } + } + })) + t.Cleanup(httpServer.Close) + fakeServer.server = httpServer + fakeServer.url = "ws" + strings.TrimPrefix(httpServer.URL, "http") + return fakeServer +} + +func TestInternalDebugChatSendCreatesConversationAndReturnsAssistantText(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + PromptChunks: []string{"spritz ", "debug"}, + StopReason: "end_turn", + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "reason":"local smoke", + "message":" hello from cli " + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var payload struct { + Status string `json:"status"` + Data internalDebugChatSendResponse `json:"data"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if !payload.Data.CreatedConversation { + t.Fatalf("expected createdConversation=true") + } + if payload.Data.AssistantText != "spritz debug" { + t.Fatalf("expected assistant text %q, got %q", "spritz debug", payload.Data.AssistantText) + } + if payload.Data.StopReason != "end_turn" { + t.Fatalf("expected stopReason end_turn, got %q", payload.Data.StopReason) + } + if payload.Data.EffectiveSessionID != "session-fresh" { + t.Fatalf("expected effective session id session-fresh, got %q", payload.Data.EffectiveSessionID) + } + if payload.Data.Conversation == nil || payload.Data.Conversation.Spec.CWD != "/workspace/app" { + t.Fatalf("expected conversation cwd /workspace/app, got %#v", payload.Data.Conversation) + } + + stored := &spritzv1.SpritzConversation{} + if err := s.client.Get(context.Background(), clientKey("spritz-test", payload.Data.Conversation.Name), stored); err != nil { + t.Fatalf("failed to reload stored conversation: %v", err) + } + if stored.Spec.Title != "Debug Run" { + t.Fatalf("expected stored title Debug Run, got %q", stored.Spec.Title) + } + if stored.Spec.SessionID != "session-fresh" { + t.Fatalf("expected stored session id session-fresh, got %q", stored.Spec.SessionID) + } + + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if fakeACP.newCalls != 1 { + t.Fatalf("expected one session/new call, got %d", fakeACP.newCalls) + } + if len(fakeACP.loadSessionIDs) != 0 { + t.Fatalf("expected no session/load for a fresh conversation, got %#v", fakeACP.loadSessionIDs) + } + if len(fakeACP.promptTexts) != 1 || fakeACP.promptTexts[0] != " hello from cli " { + t.Fatalf("expected one prompt with original message, got %#v", fakeACP.promptTexts) + } +} + +func TestInternalDebugChatSendTargetsExistingConversation(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + conversation := conversationFor("tidy-otter-conv", "tidy-otter", "user-1", "Existing", metav1.Now()) + conversation.Spec.SessionID = "session-existing" + + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + LoadReplayAfter: []map[string]any{ + { + "sessionUpdate": "agent_message_chunk", + "content": map[string]any{ + "type": "text", + "text": "stale-", + }, + }, + }, + PromptChunks: []string{"ok"}, + }) + + s := newACPTestServer(t, spritz, conversation) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + s.acp.promptSettleTimeout = 100 * time.Millisecond + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"conversationId":"tidy-otter-conv"}, + "message":"follow up" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + fakeACP.mu.Lock() + loadSessionIDs := append([]string(nil), fakeACP.loadSessionIDs...) + promptSessionIDs := append([]string(nil), fakeACP.promptSessionIDs...) + promptTexts := append([]string(nil), fakeACP.promptTexts...) + fakeACP.mu.Unlock() + t.Fatalf( + "expected status 200, got %d: %s (loads=%#v prompts=%#v promptTexts=%#v)", + rec.Code, + rec.Body.String(), + loadSessionIDs, + promptSessionIDs, + promptTexts, + ) + } + + var payload struct { + Status string `json:"status"` + Data internalDebugChatSendResponse `json:"data"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if payload.Data.CreatedConversation { + t.Fatalf("expected createdConversation=false") + } + if payload.Data.AssistantText != "ok" { + t.Fatalf("expected assistant text ok, got %q", payload.Data.AssistantText) + } + if payload.Data.Conversation == nil || payload.Data.Conversation.Name != "tidy-otter-conv" { + t.Fatalf("expected original conversation id, got %#v", payload.Data.Conversation) + } + + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if fakeACP.newCalls != 0 { + t.Fatalf("expected no session/new call, got %d", fakeACP.newCalls) + } + if len(fakeACP.loadSessionIDs) != 1 || fakeACP.loadSessionIDs[0] != "session-existing" { + t.Fatalf("expected one session/load for session-existing, got %#v", fakeACP.loadSessionIDs) + } +} + +func TestInternalDebugChatSendRejectsOwnerMismatch(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-2") + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter"}, + "message":"hello" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusForbidden { + t.Fatalf("expected status 403, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestInternalDebugChatSendDeletesCreatedConversationOnPromptFailure(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + PromptError: &acpBootstrapJSONRPCError{ + Code: -32000, + Message: "prompt failed", + }, + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "message":"hello from cli" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadGateway { + t.Fatalf("expected status 502, got %d: %s", rec.Code, rec.Body.String()) + } + + stored := &spritzv1.SpritzConversationList{} + if err := s.client.List(context.Background(), stored); err != nil { + t.Fatalf("failed to list conversations: %v", err) + } + if len(stored.Items) != 0 { + t.Fatalf("expected created conversation to be deleted on prompt failure, got %#v", stored.Items) + } +} + +func TestCleanupInternalDebugConversationUsesFallbackContextWhenCanceled(t *testing.T) { + conversation := conversationFor("tidy-otter-conv", "tidy-otter", "user-1", "Existing", metav1.Now()) + s := newACPTestServer(t, conversation) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + s.cleanupInternalDebugConversation(ctx, conversation, true) + + stored := &spritzv1.SpritzConversationList{} + if err := s.client.List(context.Background(), stored); err != nil { + t.Fatalf("failed to list conversations: %v", err) + } + if len(stored.Items) != 0 { + t.Fatalf("expected cleanup to delete the conversation even when the request context is canceled, got %#v", stored.Items) + } +} + +func TestInternalDebugChatSendCancelsPromptWhenPromptTimesOut(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + HoldPrompt: true, + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + s.acp.promptTimeout = 20 * time.Millisecond + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "message":"hello from cli" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadGateway { + t.Fatalf("expected status 502, got %d: %s", rec.Code, rec.Body.String()) + } + time.Sleep(20 * time.Millisecond) + + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if len(fakeACP.cancelSessionIDs) != 1 || fakeACP.cancelSessionIDs[0] != "session-fresh" { + t.Fatalf("expected one session/cancel for session-fresh, got %#v", fakeACP.cancelSessionIDs) + } +} + +func TestInternalDebugChatSendRejectsPermissionRequestsExplicitly(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + PermissionRequestID: "perm-1", + PromptChunks: []string{"ok"}, + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "message":"hello from cli" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if len(fakeACP.permissionResponses) != 1 { + t.Fatalf("expected one permission response, got %#v", fakeACP.permissionResponses) + } + errorPayload, _ := fakeACP.permissionResponses[0]["error"].(map[string]any) + if errorPayload == nil { + t.Fatalf("expected permission response to be an error, got %#v", fakeACP.permissionResponses[0]) + } + if fmt.Sprint(errorPayload["message"]) != "Permission requests are not supported by internal debug chat." { + t.Fatalf("unexpected permission response message: %#v", fakeACP.permissionResponses[0]) + } +} + +func TestInternalDebugChatSendRequiresDedicatedInternalHeader(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter"}, + "message":"hello" + }`)) + req.Header.Set("Authorization", "Bearer internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusUnauthorized { + t.Fatalf("expected status 401, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestInternalDebugChatSendRejectsAdminOwnerMismatch(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-2") + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.auth.adminIDs = map[string]struct{}{"admin-1": {}} + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter"}, + "message":"hello" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "admin-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusForbidden { + t.Fatalf("expected status 403, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestInternalDebugChatSendRepairsMissingSessionBeforePrompt(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + conversation := conversationFor("tidy-otter-conv", "tidy-otter", "user-1", "Existing", metav1.Now()) + conversation.Spec.SessionID = "session-stale" + + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + LoadError: &acpBootstrapJSONRPCError{ + Code: -32002, + Message: "Session session-stale not found", + }, + PromptChunks: []string{"ok"}, + }) + + s := newACPTestServer(t, spritz, conversation) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"conversationId":"tidy-otter-conv"}, + "message":"follow up" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var payload struct { + Status string `json:"status"` + Data internalDebugChatSendResponse `json:"data"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &payload); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if payload.Data.EffectiveSessionID != "session-fresh" || !payload.Data.Replaced || payload.Data.BindingState != "replaced" { + t.Fatalf("expected replaced binding for session-fresh, got %#v", payload.Data) + } + + stored := &spritzv1.SpritzConversation{} + if err := s.client.Get(context.Background(), clientKey("spritz-test", "tidy-otter-conv"), stored); err != nil { + t.Fatalf("failed to reload stored conversation: %v", err) + } + if stored.Spec.SessionID != "session-fresh" { + t.Fatalf("expected stored session id session-fresh, got %q", stored.Spec.SessionID) + } + + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if len(fakeACP.loadSessionIDs) != 1 || fakeACP.loadSessionIDs[0] != "session-stale" { + t.Fatalf("expected one session/load for session-stale, got %#v", fakeACP.loadSessionIDs) + } + if fakeACP.newCalls != 1 { + t.Fatalf("expected one replacement session/new call, got %d", fakeACP.newCalls) + } + if len(fakeACP.promptSessionIDs) != 1 || fakeACP.promptSessionIDs[0] != "session-fresh" { + t.Fatalf("expected prompt to use the replacement session, got %#v", fakeACP.promptSessionIDs) + } +} + +func TestInternalDebugChatSendFailsWhenPromptSettleHitsDeadline(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + PromptChunkDelay: 40 * time.Millisecond, + PromptChunksAfter: []string{"late"}, + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + s.acp.promptTimeout = 20 * time.Millisecond + s.acp.promptSettleTimeout = 200 * time.Millisecond + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "message":"hello from cli" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadGateway { + t.Fatalf("expected status 502, got %d: %s", rec.Code, rec.Body.String()) + } + + time.Sleep(20 * time.Millisecond) + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if len(fakeACP.cancelSessionIDs) != 1 || fakeACP.cancelSessionIDs[0] != "session-fresh" { + t.Fatalf("expected one session/cancel for session-fresh, got %#v", fakeACP.cancelSessionIDs) + } +} + +func TestInternalDebugChatSendIgnoresKeepaliveUpdatesDuringPromptSettle(t *testing.T) { + spritz := readyACPSpritz("tidy-otter", "user-1") + keepalives := make([]map[string]any, 0, 20) + for i := 0; i < 20; i++ { + keepalives = append(keepalives, map[string]any{"sessionUpdate": "heartbeat"}) + } + fakeACP := newFakeACPDebugChatServer(t, fakeACPDebugChatServerOptions{ + SessionID: "session-fresh", + PromptChunks: []string{"ok"}, + PromptUpdatesAfter: keepalives, + PromptChunkDelay: 5 * time.Millisecond, + }) + + s := newACPTestServer(t, spritz) + s.internalAuth = internalAuthConfig{enabled: true, token: "internal-token"} + s.acp.instanceURL = func(namespace, name string) string { return fakeACP.url } + s.acp.promptTimeout = 60 * time.Millisecond + s.acp.promptSettleTimeout = 10 * time.Millisecond + + e := echo.New() + internal := e.Group("", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internal.POST("/api/internal/v1/debug/chat/send", s.sendInternalDebugChat) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{ + "target":{"spritzName":"tidy-otter","cwd":"/workspace/app","title":"Debug Run"}, + "message":"hello from cli" + }`)) + req.Header.Set(internalTokenHeader, "internal-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Spritz-User-Id", "user-1") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + time.Sleep(20 * time.Millisecond) + fakeACP.mu.Lock() + defer fakeACP.mu.Unlock() + if len(fakeACP.cancelSessionIDs) != 0 { + t.Fatalf("expected no session/cancel for keepalive-only updates, got %#v", fakeACP.cancelSessionIDs) + } +} + +func TestDebugChatRouteIsUnavailableWithoutAuthOrInternalAuth(t *testing.T) { + s := newACPTestServer(t) + s.auth = authConfig{mode: authModeNone} + s.internalAuth = internalAuthConfig{enabled: false} + + e := echo.New() + s.registerRoutes(e) + + req := httptest.NewRequest(http.MethodPost, "/api/internal/v1/debug/chat/send", strings.NewReader(`{"target":{"spritzName":"tidy-otter"},"message":"hello"}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + e.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Fatalf("expected debug chat route to be unavailable without auth and internal auth, got %d: %s", rec.Code, rec.Body.String()) + } +} diff --git a/api/main.go b/api/main.go index feca9d1..4ba0a3f 100644 --- a/api/main.go +++ b/api/main.go @@ -251,6 +251,10 @@ func (s *server) registerRoutes(e *echo.Echo) { internal.GET("/runtime-bindings/:namespace/:instanceId", s.getRuntimeBinding) internal.POST("/spritzes", s.createInternalSpritz) internal.GET("/spritzes/:namespace/:name", s.getInternalSpritz) + if s.auth.enabled() { + internalSecured := group.Group("/internal/v1", s.internalAuthHeaderMiddleware(), s.authMiddleware()) + internalSecured.POST("/debug/chat/send", s.sendInternalDebugChat) + } } internal.GET("/shared-mounts/owner/:owner/:mount/latest", s.getSharedMountLatest) internal.GET("/shared-mounts/owner/:owner/:mount/revisions/:revision", s.getSharedMountRevision) diff --git a/cli/src/index.ts b/cli/src/index.ts index e6dd431..5fda7ec 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -48,6 +48,7 @@ type TtyContext = { const defaultApiBase = 'http://localhost:8080/api'; const requestTimeoutMs = Number.parseInt(process.env.SPRITZ_REQUEST_TIMEOUT_MS || '10000', 10); +const internalRequestTimeoutMs = Number.parseInt(process.env.SPRITZ_INTERNAL_REQUEST_TIMEOUT_MS || '120000', 10); const headerId = process.env.SPRITZ_API_HEADER_ID || 'X-Spritz-User-Id'; const headerEmail = process.env.SPRITZ_API_HEADER_EMAIL || 'X-Spritz-User-Email'; const headerTeams = process.env.SPRITZ_API_HEADER_TEAMS || 'X-Spritz-User-Teams'; @@ -476,6 +477,25 @@ ${ownerNotes} ${reportingNotes}`); } +function chatSendUsage() { + console.log(`Spritz chat send + +Usage: + spritz chat send (--instance | --conversation ) --message [--reason ] [--cwd ] [--title ] [--namespace <ns>] [--json] + +Environment: + SPRITZ_API_URL (default: ${process.env.SPRITZ_API_URL || defaultApiBase}) + SPRITZ_INTERNAL_TOKEN + SPRITZ_INTERNAL_REQUEST_TIMEOUT_MS + SPRITZ_USER_ID, SPRITZ_PROFILE + +Notes: + --instance creates a new owner-scoped conversation before sending the prompt. + --conversation sends into an existing owner-scoped conversation. + --cwd and --title are only used with --instance. +`); +} + function usage(guidance = guidanceForAudience()) { console.log(`Spritz CLI @@ -487,6 +507,7 @@ Usage: spritz open <name> [--namespace <ns>] spritz terminal <name> [--namespace <ns>] [--session <name>] [--transport <ws|ssh>] [--print] spritz ssh <name> [--namespace <ns>] [--session <name>] [--transport <ws|ssh>] [--print] + spritz chat send (--instance <name> | --conversation <id>) --message <text> [--reason <text>] [--cwd <path>] [--title <title>] [--namespace <ns>] [--json] spritz profile list spritz profile current spritz profile show [name] @@ -501,6 +522,8 @@ Alias: Environment: SPRITZ_API_URL (default: ${process.env.SPRITZ_API_URL || defaultApiBase}) SPRITZ_BEARER_TOKEN + SPRITZ_INTERNAL_TOKEN + SPRITZ_INTERNAL_REQUEST_TIMEOUT_MS SPRITZ_USER_ID, SPRITZ_USER_EMAIL, SPRITZ_USER_TEAMS, SPRITZ_OWNER_ID SPRITZ_API_HEADER_ID, SPRITZ_API_HEADER_EMAIL, SPRITZ_API_HEADER_TEAMS SPRITZ_TERMINAL_TRANSPORT (default: ${terminalTransportDefault}) @@ -740,6 +763,14 @@ async function resolveDefaultOwnerId(): Promise<string | undefined> { ); } +function resolveInternalToken(): string { + const token = argValue('--internal-token') || process.env.SPRITZ_INTERNAL_TOKEN; + if (!token?.trim()) { + throw new Error('SPRITZ_INTERNAL_TOKEN or --internal-token is required for chat send'); + } + return token.trim(); +} + async function request(path: string, init?: RequestInit) { const controller = new AbortController(); const timeoutMs = Number.isFinite(requestTimeoutMs) ? requestTimeoutMs : 10000; @@ -782,6 +813,45 @@ async function request(path: string, init?: RequestInit) { return text ? text : null; } +async function internalRequest(path: string, init?: RequestInit) { + const controller = new AbortController(); + const timeoutMs = Number.isFinite(internalRequestTimeoutMs) ? internalRequestTimeoutMs : 95000; + const timeout = setTimeout(() => controller.abort(), Math.max(timeoutMs, 1000)); + const mergedHeaders = { + ...(await authHeaders()), + 'X-Spritz-Internal-Token': resolveInternalToken(), + ...normalizeHeaders(init?.headers), + }; + const apiBase = await resolveApiBase(); + const res = await fetch(`${apiBase}${path}`, { + ...init, + headers: mergedHeaders, + signal: controller.signal, + }).finally(() => clearTimeout(timeout)); + const text = await res.text(); + let data: any = null; + if (text) { + try { + data = JSON.parse(text); + } catch { + data = null; + } + } + const jsend = isJSend(data) ? data : null; + if (!res.ok || (res.ok && jsend && jsend.status !== 'success')) { + const message = + (jsend && (jsend.message || jsend.data?.message || jsend.data?.error)) || + text || + res.statusText || + 'Request failed'; + throw new SpritzRequestError(message, { statusCode: res.status, data: jsend?.data }); + } + if (res.status === 204) return null; + if (jsend) return jsend.data ?? null; + if (data !== null) return data; + return text ? text : null; +} + function defaultTerminalSessionName(name: string, namespace?: string): string { const ns = namespace?.trim() || 'default'; return `spritz:${ns}:${name}`; @@ -1105,6 +1175,10 @@ async function main() { createUsage(guidance); return; } + if (command === 'help' && rest[0] === 'chat' && (!rest[1] || rest[1] === 'send')) { + chatSendUsage(); + return; + } usage(guidance); return; } @@ -1114,6 +1188,11 @@ async function main() { return; } + if (command === 'chat' && hasFlag('--help')) { + chatSendUsage(); + return; + } + if (command === 'profile') { const action = rest[0]; const config = await loadConfig(); @@ -1372,6 +1451,56 @@ async function main() { return; } + if (command === 'chat') { + const action = rest[0]; + if (!action || action === 'help' || action === '--help') { + chatSendUsage(); + return; + } + if (action !== 'send') { + throw new Error(`unknown chat command: ${action}`); + } + const instanceName = argValue('--instance')?.trim(); + const conversationId = argValue('--conversation')?.trim(); + if (Boolean(instanceName) === Boolean(conversationId)) { + throw new Error('exactly one of --instance or --conversation is required'); + } + if (conversationId && (argValue('--cwd') || argValue('--title'))) { + throw new Error('--cwd and --title are only supported with --instance'); + } + if (argValue('--owner-id')?.trim()) { + throw new Error('--owner-id is not supported for chat send; authenticate as the caller instead'); + } + const message = argValue('--message'); + if (!message?.trim()) { + throw new Error('--message is required'); + } + const ns = await resolveNamespace(); + const reason = argValue('--reason')?.trim() || 'spz chat send'; + const requestHeaders: Record<string, string> = { 'Content-Type': 'application/json' }; + const data = await internalRequest('/internal/v1/debug/chat/send', { + method: 'POST', + headers: requestHeaders, + body: JSON.stringify({ + target: { + namespace: ns, + spritzName: instanceName, + conversationId, + cwd: argValue('--cwd'), + title: argValue('--title'), + }, + reason, + message, + }), + }); + if (hasFlag('--json') || !data?.assistantText) { + console.log(JSON.stringify(data, null, 2)); + return; + } + console.log(data.assistantText); + return; + } + if (command === 'ssh' || command === 'terminal') { const name = rest[0]; if (!name) throw new Error('name is required'); diff --git a/cli/test/chat-send.test.ts b/cli/test/chat-send.test.ts new file mode 100644 index 0000000..74f5d32 --- /dev/null +++ b/cli/test/chat-send.test.ts @@ -0,0 +1,360 @@ +import assert from 'node:assert/strict'; +import { spawn } from 'node:child_process'; +import { mkdtempSync, writeFileSync } from 'node:fs'; +import http from 'node:http'; +import os from 'node:os'; +import test from 'node:test'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); +const cliPath = path.join(__dirname, '..', 'src', 'index.ts'); + +test('chat send uses the internal token and prints assistant text by default', async (t) => { + let requestBody: any = null; + let requestHeaders: http.IncomingHttpHeaders | null = null; + + const server = http.createServer((req, res) => { + requestHeaders = req.headers; + const chunks: Buffer[] = []; + req.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + req.on('end', () => { + requestBody = JSON.parse(Buffer.concat(chunks).toString('utf8')); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + status: 'success', + data: { + conversation: { + metadata: { name: 'tidy-otter-conv' }, + spec: { spritzName: 'tidy-otter', sessionId: 'session-fresh' }, + }, + effectiveSessionId: 'session-fresh', + assistantText: 'spritz debug', + stopReason: 'end_turn', + createdConversation: true, + }, + })); + }); + }); + await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve)); + t.after(() => { + server.close(); + }); + const address = server.address(); + assert.ok(address && typeof address === 'object'); + + const child = spawn( + process.execPath, + [ + '--import', + 'tsx', + cliPath, + 'chat', + 'send', + '--instance', + 'tidy-otter', + '--message', + ' hello from cli ', + '--cwd', + '/workspace/app', + '--title', + 'Debug Run', + '--reason', + 'local smoke', + ], + { + env: { + ...process.env, + SPRITZ_API_URL: `http://127.0.0.1:${address.port}/api`, + SPRITZ_INTERNAL_TOKEN: 'internal-token', + SPRITZ_USER_ID: 'user-123', + SPRITZ_CONFIG_DIR: mkdtempSync(path.join(os.tmpdir(), 'spz-config-')), + }, + stdio: ['ignore', 'pipe', 'pipe'], + }, + ); + + let stdout = ''; + let stderr = ''; + child.stdout.on('data', (chunk) => { + stdout += chunk.toString(); + }); + child.stderr.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exitCode = await new Promise<number | null>((resolve) => child.on('exit', resolve)); + assert.equal(exitCode, 0, `spz chat send should succeed: ${stderr}`); + assert.equal(stdout.trim(), 'spritz debug'); + assert.equal(requestHeaders?.['x-spritz-internal-token'], 'internal-token'); + assert.equal(requestHeaders?.['x-spritz-user-id'], 'user-123'); + assert.deepEqual(requestBody, { + target: { + spritzName: 'tidy-otter', + cwd: '/workspace/app', + title: 'Debug Run', + }, + reason: 'local smoke', + message: ' hello from cli ', + }); +}); + +test('chat send supports existing conversations and json output', async (t) => { + let requestBody: any = null; + + const server = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + req.on('end', () => { + requestBody = JSON.parse(Buffer.concat(chunks).toString('utf8')); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + status: 'success', + data: { + conversation: { + metadata: { name: 'tidy-otter-conv' }, + spec: { spritzName: 'tidy-otter', sessionId: 'session-existing' }, + }, + effectiveSessionId: 'session-existing', + assistantText: 'ok', + stopReason: 'end_turn', + createdConversation: false, + }, + })); + }); + }); + await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve)); + t.after(() => { + server.close(); + }); + const address = server.address(); + assert.ok(address && typeof address === 'object'); + + const child = spawn( + process.execPath, + [ + '--import', + 'tsx', + cliPath, + 'chat', + 'send', + '--conversation', + 'tidy-otter-conv', + '--message', + 'follow up', + '--json', + ], + { + env: { + ...process.env, + SPRITZ_API_URL: `http://127.0.0.1:${address.port}/api`, + SPRITZ_INTERNAL_TOKEN: 'internal-token', + SPRITZ_USER_ID: 'user-123', + SPRITZ_CONFIG_DIR: mkdtempSync(path.join(os.tmpdir(), 'spz-config-')), + }, + stdio: ['ignore', 'pipe', 'pipe'], + }, + ); + + let stdout = ''; + let stderr = ''; + child.stdout.on('data', (chunk) => { + stdout += chunk.toString(); + }); + child.stderr.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exitCode = await new Promise<number | null>((resolve) => child.on('exit', resolve)); + assert.equal(exitCode, 0, `spz chat send --json should succeed: ${stderr}`); + assert.deepEqual(requestBody, { + target: { + conversationId: 'tidy-otter-conv', + }, + reason: 'spz chat send', + message: 'follow up', + }); + + const payload = JSON.parse(stdout); + assert.equal(payload.effectiveSessionId, 'session-existing'); + assert.equal(payload.assistantText, 'ok'); + assert.equal(payload.createdConversation, false); +}); + +test('chat send does not inject owner header when bearer auth comes from the active profile', async (t) => { + let requestHeaders: http.IncomingHttpHeaders | null = null; + + const server = http.createServer((req, res) => { + requestHeaders = req.headers; + req.resume(); + req.on('end', () => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + status: 'success', + data: { + assistantText: 'ok', + }, + })); + }); + }); + await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve)); + t.after(() => { + server.close(); + }); + const address = server.address(); + assert.ok(address && typeof address === 'object'); + + const configDir = mkdtempSync(path.join(os.tmpdir(), 'spz-config-')); + const configPath = path.join(configDir, 'config.json'); + writeFileSync(configPath, JSON.stringify({ + currentProfile: 'p1', + profiles: { + p1: { + apiUrl: `http://127.0.0.1:${address.port}/api`, + bearerToken: 'profile-token', + }, + }, + }, null, 2)); + + const child = spawn( + process.execPath, + [ + '--import', + 'tsx', + cliPath, + 'chat', + 'send', + '--instance', + 'tidy-otter', + '--message', + 'hello from cli', + ], + { + env: { + ...process.env, + SPRITZ_INTERNAL_TOKEN: 'internal-token', + SPRITZ_CONFIG_DIR: configDir, + }, + stdio: ['ignore', 'pipe', 'pipe'], + }, + ); + + let stderr = ''; + child.stderr.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exitCode = await new Promise<number | null>((resolve) => child.on('exit', resolve)); + assert.equal(exitCode, 0, `spz chat send should succeed: ${stderr}`); + assert.equal(requestHeaders?.authorization, 'Bearer profile-token'); + assert.equal(requestHeaders?.['x-spritz-user-id'], undefined); +}); + +test('chat send rejects --owner-id as an auth source', async () => { + const child = spawn( + process.execPath, + [ + '--import', + 'tsx', + cliPath, + 'chat', + 'send', + '--instance', + 'tidy-otter', + '--message', + 'hello from cli', + '--owner-id', + 'victim-id', + ], + { + env: { + ...process.env, + SPRITZ_INTERNAL_TOKEN: 'internal-token', + SPRITZ_CONFIG_DIR: mkdtempSync(path.join(os.tmpdir(), 'spz-config-')), + }, + stdio: ['ignore', 'pipe', 'pipe'], + }, + ); + + let stderr = ''; + child.stderr.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exitCode = await new Promise<number | null>((resolve) => child.on('exit', resolve)); + assert.notEqual(exitCode, 0, 'spz chat send should reject --owner-id'); + assert.match(stderr, /--owner-id is not supported for chat send/); +}); + +test('chat send does not require an owner id when bearer auth is available', async (t) => { + let requestHeaders: http.IncomingHttpHeaders | null = null; + + const server = http.createServer((req, res) => { + requestHeaders = req.headers; + req.resume(); + req.on('end', () => { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ + status: 'success', + data: { + assistantText: 'ok', + }, + })); + }); + }); + await new Promise<void>((resolve) => server.listen(0, '127.0.0.1', resolve)); + t.after(() => { + server.close(); + }); + const address = server.address(); + assert.ok(address && typeof address === 'object'); + + const configDir = mkdtempSync(path.join(os.tmpdir(), 'spz-config-')); + const configPath = path.join(configDir, 'config.json'); + writeFileSync(configPath, JSON.stringify({ + currentProfile: 'p1', + profiles: { + p1: { + apiUrl: `http://127.0.0.1:${address.port}/api`, + bearerToken: 'profile-token', + }, + }, + }, null, 2)); + + const child = spawn( + process.execPath, + [ + '--import', + 'tsx', + cliPath, + 'chat', + 'send', + '--instance', + 'tidy-otter', + '--message', + 'hello from cli', + ], + { + env: { + ...process.env, + USER: '', + SPRITZ_OWNER_ID: '', + SPRITZ_USER_ID: '', + SPRITZ_INTERNAL_TOKEN: 'internal-token', + SPRITZ_CONFIG_DIR: configDir, + }, + stdio: ['ignore', 'pipe', 'pipe'], + }, + ); + + let stderr = ''; + child.stderr.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const exitCode = await new Promise<number | null>((resolve) => child.on('exit', resolve)); + assert.equal(exitCode, 0, `spz chat send should succeed without an owner id in bearer mode: ${stderr}`); + assert.equal(requestHeaders?.authorization, 'Bearer profile-token'); + assert.equal(requestHeaders?.['x-spritz-user-id'], undefined); +}); diff --git a/docs/2026-03-09-acp-port-and-agent-chat-architecture.md b/docs/2026-03-09-acp-port-and-agent-chat-architecture.md index 53f3df4..53f8403 100644 --- a/docs/2026-03-09-acp-port-and-agent-chat-architecture.md +++ b/docs/2026-03-09-acp-port-and-agent-chat-architecture.md @@ -88,6 +88,23 @@ The path is always: That keeps auth, origin checks, and future policy enforcement in one place. +### Transcript ownership + +ACP owns the durable transcript. + +That means: + +- the browser may show transient send state such as disabled composer state or a + "waiting" status +- the browser should not append durable user or assistant transcript entries + outside ACP events +- the echoed ACP `user_message_chunk` is the authoritative user message +- the ACP `agent_message_chunk` stream is the authoritative assistant message + +This rule avoids transcript divergence between optimistic UI state and the real +ACP session replay. In practice it also prevents duplicated user bubbles when a +prompt is rendered once locally and then echoed back again by ACP. + ## Helm surface The chart exposes ACP through these values: diff --git a/docs/2026-03-24-privileged-conversation-debug-and-test-architecture.md b/docs/2026-03-24-privileged-conversation-debug-and-test-architecture.md new file mode 100644 index 0000000..0c8ca5c --- /dev/null +++ b/docs/2026-03-24-privileged-conversation-debug-and-test-architecture.md @@ -0,0 +1,584 @@ +--- +date: 2026-03-24 +author: Spritz Maintainers <user@example.com> +title: Privileged Conversation Debug and Test Architecture +tags: [spritz, conversations, debug, testing, auth, audit, architecture] +--- + +## Overview + +This document defines the preferred architecture for privileged conversation +debugging and testing in Spritz. + +For the broader long-term conversation architecture, see +[Spritz-Native Conversation Broker Architecture](2026-03-24-spritz-native-conversation-broker-architecture.md). + +The target model is: + +- the Spritz control plane owns all privileged debug and test access, +- runtimes remain private and do not trust engineer laptops directly, +- a narrow internal API brokers conversation actions on behalf of authorized + callers, +- a thin `spz` command becomes the standard machine client for this flow, +- every privileged action is authorized, scoped, time-bounded, and audited. + +This is the "holy grail" end state for headless conversation testing: + +- no direct pod access as the normal workflow, +- no browser dependency for routine debugging, +- no special runtime backdoors, +- no duplicate fake conversation path, +- one canonical way to send a message, read a response, and inspect the result. + +## Problem + +Today, conversation debugging is possible, but the workflow is operationally +awkward: + +- discover the target instance, +- reach ACP through port-forward or equivalent plumbing, +- speak ACP directly from the client, +- collect session state and response chunks locally, +- clean up the temporary path by hand. + +That is acceptable for one-off debugging, but it is not the correct long-term +abstraction for: + +- fast local verification, +- repeatable smoke tests, +- CI validation, +- incident triage, +- privileged support workflows, +- agent-driven testing. + +The platform needs a first-class internal conversation testing surface that is: + +- elegant, +- secure, +- auditable, +- portable, +- runtime-agnostic. + +## Goals + +- Make "send a message and read the response" a first-class control-plane + operation for authorized internal callers. +- Keep the real runtime path intact by using the same conversation transport the + product already depends on. +- Centralize authorization, audit logging, and environment policy in + `spritz-api`. +- Give engineers, CI, and authorized agents a stable CLI/API path that does not + require direct Kubernetes access. +- Preserve a strict separation between: + - user-facing access, + - service-principal automation, + - privileged internal debugging. + +## Non-goals + +- Exposing a privileged conversation interface to all end users. +- Replacing browser-based end-to-end tests. +- Making runtimes accept hidden admin passwords or debug bypass flags. +- Giving engineers direct pod or ACP access as the preferred workflow. +- Duplicating conversation execution logic outside the real runtime path. + +## Design Principles + +### Spritz remains the control plane + +Privileged debugging must be owned by the control plane, not by ad hoc client +scripts or direct runtime access. + +That means Spritz owns: + +- authorization, +- policy checks, +- session minting, +- audit logging, +- environment gating, +- conversation brokering, +- debug result normalization. + +### One real message path + +The privileged debug/test path must use the same conversation execution path as +normal product traffic. + +It may add: + +- stronger authorization, +- richer observability, +- server-side brokering, +- privileged inspection APIs. + +It must not add: + +- a fake shortcut that skips the real runtime, +- a separate model invocation implementation, +- a special-case execution path only used by tests. + +### Runtimes trust the control plane, not the caller + +The runtime should not need to understand whether the caller is: + +- a human user, +- a CI job, +- a support engineer, +- an internal coding agent. + +The runtime should trust only: + +- the Spritz control plane, +- a short-lived capability minted by the control plane, +- the existing per-conversation transport and binding rules. + +### Capabilities over ambient privilege + +Privileged access should be expressed as narrow, short-lived capabilities, not +as long-lived broad admin power. + +Every capability should be bound to: + +- one actor, +- one environment, +- one target instance or conversation, +- one allowed action set, +- one reason, +- one expiration time. + +### Default deny for cross-owner access + +The safest default is: + +- a caller may only inspect or drive conversations they already own. + +Cross-owner access should require explicit privileged scope and stronger audit +requirements. + +## Large-Scale Internal Platform Model + +A large internal platform would usually avoid direct workstation-to-runtime +debug access and would instead centralize the path behind a privileged internal +service. + +The essential traits of that model are: + +- workforce or service identity at the edge, +- centralized authorization, +- delegated short-lived credentials, +- mandatory audit logging, +- environment-specific policy, +- explicit production break-glass controls, +- a boring, standard RPC surface. + +Spritz should follow the same model. + +The practical lesson is simple: + +- engineers should talk to `spritz-api`, +- `spritz-api` should talk to the conversation runtime, +- the runtime should not need to trust the engineer directly. + +## Core Model + +The clean target architecture introduces a new internal component inside +`spritz-api`: + +- privileged conversation broker + +This broker: + +- resolves the target instance or conversation, +- authorizes the actor, +- opens or resumes the real conversation transport, +- sends the message over the real runtime path, +- streams or collects the resulting events, +- returns normalized output to the caller, +- records a complete audit trail. + +### Canonical actors + +The model should distinguish three principal classes: + +### Human owner + +The default case. + +The human owner can: + +- read their own conversation, +- send a message to their own conversation, +- inspect their own transcript, +- use the normal browser flow. + +### Internal automation principal + +A machine caller such as: + +- CI, +- a regression harness, +- a support automation, +- an internal coding agent. + +This caller should only have the narrow scopes it needs. + +### Admin or break-glass principal + +A separate elevated role for cross-owner investigation or production response. + +This role should be: + +- rare, +- explicit, +- heavily audited, +- environment-gated. + +## Privileged Debug Session Model + +The broker should mint a short-lived debug session before it performs a +privileged conversation action. + +Recommended fields: + +- `sessionId` +- `actor` +- `target` +- `environment` +- `reason` +- `allowedActions` +- `expiresAt` +- `createdAt` + +Recommended target binding: + +- instance name and namespace, or +- canonical Spritz conversation id + +Recommended action set: + +- `conversation.read` +- `conversation.send` +- `conversation.events.read` +- `conversation.transcript.read` +- `conversation.cancel` + +The session should be invalid outside its narrow target and lifetime. + +## Authorization Model + +### Owner-scoped default path + +The default privileged API should still respect ownership. + +Example: + +- a human or service principal acting on their own instance can debug without + cross-owner privileges. + +This supports: + +- local verification, +- CI for pre-owned test principals, +- internal agent-driven testing with dedicated owners. + +### Cross-owner privileged path + +Cross-owner conversation access should require an explicit elevated scope such +as: + +- `spritz.debug.conversations.read.any` +- `spritz.debug.conversations.send.any` + +These scopes must not be implied by: + +- instance creation, +- standard admin UI access, +- generic bearer access, +- service-principal create permissions. + +### Production break-glass + +Production should require stronger controls than staging. + +Recommended requirements: + +- explicit reason, +- optional ticket or incident reference, +- explicit production debug scope, +- short TTL, +- stronger audit retention, +- optional dual control or approval outside Spritz core. + +### Environment gates + +The broker should support deployment policy such as: + +- enabled in staging, +- disabled by default in production, +- enabled in production only for explicit privileged principals. + +## Internal API Model + +The exact path names can change, but the control-plane shape should look like +this: + +### Create a privileged debug session + +```text +POST /internal/v1/debug/sessions +``` + +Request shape: + +- target instance or conversation +- requested actions +- reason +- optional environment-specific metadata + +Response shape: + +- debug session metadata +- expiration time +- effective allowed actions + +### Send a message + +```text +POST /internal/v1/debug/sessions/{sessionId}/messages +``` + +Request shape: + +- message content +- conversation id or runtime session hint +- wait mode (`stream` or `complete`) + +Response shape: + +- accepted message id +- conversation id +- stream handle or synchronous result envelope + +### Read events + +```text +GET /internal/v1/debug/sessions/{sessionId}/events +``` + +This should support: + +- streaming chunked events, +- reconnectable event cursors, +- normalized assistant text extraction, +- timing metadata. + +### Read transcript + +```text +GET /internal/v1/debug/sessions/{sessionId}/transcript +``` + +This should return: + +- normalized transcript items, +- message roles, +- event timestamps, +- stop reason, +- optional tool/event metadata. + +### Close the debug session + +```text +DELETE /internal/v1/debug/sessions/{sessionId} +``` + +## Broker Execution Model + +The privileged broker should execute the conversation operation server-side. + +Recommended flow: + +1. authenticate actor +2. authorize target and requested actions +3. mint debug session +4. resolve target runtime binding +5. bootstrap or resume the real conversation +6. send the message through the real runtime path +7. collect and stream events +8. write audit records +9. expire the session automatically + +The broker may talk to the runtime over: + +- ACP, +- an internal conversation gateway, +- another runtime-native transport. + +The important invariant is: + +- the control plane owns the privileged hop, +- the client does not. + +## CLI Model + +`spz` should be the standard machine client for this workflow. + +Recommended commands: + +```bash +spz debug session create \ + --instance example-instance \ + --namespace example-namespace \ + --reason "regression verification" + +spz chat send \ + --instance example-instance \ + --namespace example-namespace \ + --message "Reply with the exact token example-token and nothing else." \ + --wait \ + --json + +spz chat transcript \ + --instance example-instance \ + --namespace example-namespace \ + --json +``` + +### Why `spz` + +`spz` is already the thin machine client for Spritz. + +Reusing it preserves: + +- one operational surface, +- one auth model, +- one output format, +- one place for smoke and debug tooling. + +The CLI should remain thin. It should not: + +- speak ACP directly by default, +- implement conversation routing logic, +- evaluate authorization rules locally. + +## Audit Model + +Every privileged conversation action must emit an audit record. + +Recommended fields: + +- actor id +- actor type +- target owner id +- target instance or conversation id +- environment +- reason +- action type +- created at +- completed at +- result classification +- message hash +- response hash + +Optional content capture should be deployment-controlled. + +Safe default: + +- store structured metadata and hashes by default, +- allow full content capture only where explicitly approved. + +## Data Safety and Privacy + +Privileged debug access must be treated as sensitive because conversations may +contain: + +- user prompts, +- assistant output, +- tool traces, +- secrets, +- customer data, +- internal source code. + +Recommended defaults: + +- redact obvious secrets in derived logs, +- keep raw content out of broad metrics, +- bound transcript reads to explicit scope, +- avoid long-lived storage in the broker, +- expire debug session material quickly. + +## Security Properties + +This design should guarantee all of the following: + +- direct runtime access is not required for routine debugging, +- the runtime does not accept broad ambient admin access, +- privileged access is short-lived and target-bound, +- cross-owner access is explicit and auditable, +- production access is more constrained than staging, +- the same real conversation path is used for debugging and product traffic, +- the CLI remains a thin client over the control plane. + +## Migration Path + +### Phase 1 + +Add an internal broker API in `spritz-api` and a thin `spz chat send` client +for owner-scoped usage. + +The current phase-one shape is: + +- `POST /api/internal/v1/debug/chat/send` +- `spz chat send --instance ... --message ...` +- `spz chat send --conversation ... --message ...` + +This phase-one endpoint is intentionally synchronous and owner-scoped. +It should only be registered when both internal auth and normal caller auth are +enabled, so the control plane can bind the request to a real authenticated +principal instead of trusting a caller-supplied owner id. +Transcript reads, event streaming, and explicit cross-owner break-glass remain +later phases. + +This replaces the need for direct ACP from local laptops for routine testing. + +The longer-term target is to move this flow under the canonical conversation +broker described in +[Spritz-Native Conversation Broker Architecture](2026-03-24-spritz-native-conversation-broker-architecture.md). + +### Phase 2 + +Add server-side event streaming and transcript reads. + +This makes CI and agent-driven debugging first-class. + +### Phase 3 + +Add explicit cross-owner privileged scopes and production break-glass policy. + +This supports support and incident-response workflows safely. + +### Phase 4 + +Treat direct port-forward and raw ACP debugging as fallback-only operator tools, +not as the standard path. + +## Validation + +The architecture is successful when all of the following are true: + +- an authorized caller can send a message to an owned conversation without + browser access +- the caller can read the real assistant response through the control plane +- the runtime does not need to trust the caller directly +- all privileged actions create audit records +- cross-owner access is denied without explicit elevated scope +- production access is gated more tightly than staging +- CI can run a real conversation smoke without direct ACP plumbing on the + runner + +## References + +- `docs/2026-03-09-acp-port-and-agent-chat-architecture.md` +- `docs/2026-03-10-acp-conversation-storage-and-replay-model.md` +- `docs/2026-03-11-external-provisioner-and-service-principal-architecture.md` +- `docs/2026-03-13-acp-smoke-contract.md` diff --git a/docs/2026-03-24-spritz-native-conversation-broker-architecture.md b/docs/2026-03-24-spritz-native-conversation-broker-architecture.md new file mode 100644 index 0000000..555cd98 --- /dev/null +++ b/docs/2026-03-24-spritz-native-conversation-broker-architecture.md @@ -0,0 +1,383 @@ +--- +date: 2026-03-24 +author: Spritz Maintainers <user@example.com> +title: Spritz-Native Conversation Broker Architecture +tags: [spritz, conversations, broker, api, acp, architecture] +--- + +## Overview + +This document defines the long-term architecture for conversation execution in +Spritz. + +The target model is: + +- clients use a simple Spritz-native conversation API, +- Spritz owns conversation execution as a control-plane capability, +- ACP remains the runtime protocol behind that layer, +- UI, CLI, debug tools, smoke tests, and automation all use the same broker, +- runtime-specific session behavior is hidden behind adapters instead of + leaking into every client. + +This is the preferred end state for an elegant and production-ready +conversation system. + +The existing internal debug-chat endpoint is a useful phase-one step, but it is +not the final architecture. + +## Problem + +ACP is a good runtime protocol, but it is not a complete product API for +Spritz. + +If clients speak raw ACP directly, each client must understand how to: + +- open and manage ACP connections, +- initialize protocol state, +- bind or load sessions, +- distinguish replayed history from fresh output, +- send prompts, +- handle permission requests, +- cancel prompts, +- recover from disconnects, +- interpret runtime-specific timing and replay behavior. + +That approach appears simple at first, but it spreads session-state logic +across: + +- the web UI, +- the CLI, +- internal debug tools, +- smoke tests, +- future integrations. + +Once that happens, each caller becomes a small conversation broker. The result +is: + +- duplicated state-machine logic, +- inconsistent behavior across clients, +- weaker observability, +- harder authorization and audit enforcement, +- more fragile production behavior. + +## Why Spritz Should Wrap ACP + +Spritz should wrap ACP because ACP and the Spritz product boundary solve +different problems. + +ACP should remain: + +- the protocol between Spritz and runtimes, +- the portability layer for runtime providers, +- the low-level transport for session and prompt execution. + +Spritz should own: + +- authentication, +- authorization, +- ownership policy, +- audit logging, +- replay and prompt lifecycle management, +- conversation state normalization, +- public and internal API stability. + +In other words: + +- ACP is the runtime contract, +- the broker is the product contract. + +This is not duplication. + +It is the standard layering: + +- low-level protocol inside, +- product-oriented API outside. + +## Goals + +- Make conversation send, stream, and cancel first-class Spritz operations. +- Keep ACP internal to the control plane whenever possible. +- Centralize conversation state handling in one broker layer. +- Eliminate replay/prompt lifecycle logic from most clients. +- Preserve runtime portability through ACP adapters. +- Provide one path that serves normal chat, debug, CLI, smoke tests, and + automation. +- Keep the architecture compatible with local development, staging, and + production. + +## Non-goals + +- Replacing ACP between Spritz and runtimes. +- Requiring a separate microservice on day one. +- Breaking existing chat contracts in one migration. +- Removing browser-based end-to-end testing. +- Exposing privileged debug behavior to normal end users. + +## Design Principle + +### Broker first, microservice later if needed + +The broker should start as an internal subsystem inside `spritz-api`. + +That gives Spritz: + +- one logical boundary, +- one owner for conversation execution, +- low migration cost, +- minimal operational overhead. + +If scale, isolation, or team boundaries later justify it, the same broker can +be split into a dedicated service. + +The important boundary is logical first, not process-level first. + +## Target Architecture + +```mermaid +flowchart LR + UI["UI"] --> API["Spritz Conversation API"] + CLI["CLI"] --> API + DBG["Debug Tools"] --> API + SMK["Smoke Tests"] --> API + API --> BROKER["Conversation Broker"] + BROKER --> ADAPTER["ACP Adapter"] + ADAPTER --> RT["Runtime"] +``` + +### External API shape + +Clients should talk to a simple Spritz-native API such as: + +- `POST /api/conversations/{id}/messages` +- `GET /api/conversations/{id}/events` +- `POST /api/conversations/{id}:cancel` + +Privileged internal variants can exist for: + +- owner-scoped debug, +- audited break-glass access, +- smoke or CI automation. + +The important part is that these APIs describe product operations, not ACP +steps. + +Clients should ask for: + +- send this message, +- stream this conversation, +- cancel this prompt. + +Clients should not need to know about: + +- `initialize`, +- `session/load`, +- `session/prompt`, +- replay draining heuristics, +- runtime-specific session repair logic. + +### Broker responsibilities + +The broker should be the only layer that knows how to: + +- resolve a conversation target to a runtime, +- bind or repair runtime session ids, +- load historical transcript state, +- separate replay from fresh prompt output, +- forward prompts, +- stream normalized updates, +- cancel in-flight work, +- translate runtime/protocol errors into stable Spritz errors, +- emit audit and observability events. + +### ACP adapter responsibilities + +The ACP adapter should translate broker operations into ACP protocol behavior. + +That adapter should own: + +- ACP connection management, +- ACP request and notification handling, +- runtime capability probing, +- session bootstrap and repair, +- runtime-specific quirks hidden behind a stable interface. + +This keeps ACP pure without making it the public API boundary. + +## Why This Is Better Than Pure ACP Exposure + +This architecture gives a simpler system overall. + +### With pure ACP exposure + +Each client must implement: + +- session lifecycle management, +- replay handling, +- cancellation, +- timeout policy, +- permission request policy, +- output normalization. + +### With a conversation broker + +Each client implements: + +- call the Spritz API, +- render messages, +- optionally stream events. + +The broker carries the hard logic once. + +That is simpler for the whole platform even if the broker itself is more +capable than a thin ACP proxy. + +## Authorization Model + +The broker should enforce policy directly instead of relying on scattered +transport assumptions. + +At minimum it should support: + +- caller owns the conversation, +- caller owns the target instance, +- privileged internal debug scopes, +- explicit break-glass access for cross-owner actions, +- environment-specific restrictions, +- full audit logging. + +This is cleaner than making clients combine: + +- auth middleware, +- internal tokens, +- owner checks, +- ad hoc admin exceptions. + +## Event Model + +The broker should expose a stable event model that is product-oriented rather +than protocol-oriented. + +For example: + +- `conversation.replay.started` +- `conversation.replay.completed` +- `conversation.message.delta` +- `conversation.message.completed` +- `conversation.tool.started` +- `conversation.tool.updated` +- `conversation.prompt.completed` +- `conversation.prompt.cancelled` + +Internally, ACP may still emit `session/update` messages with runtime-specific +shapes. + +The broker should normalize those before they reach most clients. + +This is the cleanest way to eliminate replay-boundary heuristics from callers. + +## Operational Model + +The broker should support: + +- local development through `spritz-api`, +- staging and production with the same API shape, +- request tracing across the conversation lifecycle, +- structured audit logs, +- stable timeout and cancellation policy, +- reusable smoke and regression tests. + +This also makes it straightforward to build: + +- a privileged CLI, +- repeatable conversation smoke tests, +- internal support tooling, +- future server-side automation. + +## Migration Strategy + +This should be introduced as a strangler refactor, not a rewrite. + +### Phase 1 + +Keep the existing phase-one debug path: + +- `POST /api/internal/v1/debug/chat/send` +- `spz chat send` + +This is already useful and should remain working. + +### Phase 2 + +Move the internal debug path onto a broker abstraction inside `spritz-api`. + +At this point: + +- the outer API stays the same, +- the handler becomes thinner, +- ACP orchestration moves into the broker and adapter. + +### Phase 3 + +Move existing chat and conversation-loading flows onto the same broker. + +This should happen behind compatibility-preserving endpoints so the UI does not +break during migration. + +### Phase 4 + +Once all primary paths use the broker: + +- reduce direct ACP knowledge in the UI and CLI, +- keep raw ACP access only for narrow low-level cases, +- treat the broker as the canonical conversation execution layer. + +## Compatibility Guidance + +The safe migration rule is: + +- keep current API contracts stable, +- replace implementation under them, +- verify behavior parity before cutting over the next caller. + +This is especially important for: + +- conversation bootstrap, +- transcript replay, +- prompt send, +- cancel behavior, +- reconnect behavior, +- permission handling. + +## Risks + +If Spritz continues to rely on raw ACP semantics at too many boundaries, the +main risks are: + +- replay and prompt output getting mixed, +- inconsistent behavior between clients, +- duplicated timeout logic, +- weaker authorization guarantees, +- harder production debugging. + +If the broker becomes the canonical layer, the main risk is over-designing it +too early. + +That is why the recommended path is: + +- start as an internal subsystem, +- keep the API small, +- migrate call paths incrementally, +- split into a separate service only if the platform truly needs it. + +## Recommended End State + +The preferred end state for Spritz is: + +- one simple Spritz-native conversation API, +- one canonical broker implementation, +- ACP retained as the runtime protocol behind adapters, +- all major clients sharing the same execution path, +- auth, audit, and lifecycle policy enforced in one place. + +That is the cleanest way to make Spritz conversations both simple at the +surface and production-ready underneath. diff --git a/ui/src/lib/acp-transcript.ts b/ui/src/lib/acp-transcript.ts index efc936c..5286a6e 100644 --- a/ui/src/lib/acp-transcript.ts +++ b/ui/src/lib/acp-transcript.ts @@ -270,6 +270,9 @@ export function applySessionUpdate( transcript.thinkingStartTime = 0; transcript.thinkingElapsedSeconds = 0; + // ACP is the source of truth for durable chat transcript messages. The + // chat page should not append a separate local user bubble for the same + // prompt; it should wait for this echoed user_message_chunk instead. if (historical) { appendHistoricalText(transcript, 'user', text, (update.historyMessageId || update.messageId) as string); } else { diff --git a/ui/src/pages/chat.test.tsx b/ui/src/pages/chat.test.tsx index d102f5c..4751064 100644 --- a/ui/src/pages/chat.test.tsx +++ b/ui/src/pages/chat.test.tsx @@ -8,29 +8,63 @@ import { ConfigProvider, config } from '@/lib/config'; import { NoticeProvider } from '@/components/notice-banner'; import { ChatPage } from './chat'; -const { requestMock, sendPromptMock } = vi.hoisted(() => ({ - requestMock: vi.fn(), - sendPromptMock: vi.fn(), -})); +const { requestMock, sendPromptMock, emitUpdate, setUpdateHandler } = vi.hoisted(() => { + let updateHandler: + | ((update: Record<string, unknown>, options?: { historical?: boolean }) => void) + | undefined; + return { + requestMock: vi.fn(), + sendPromptMock: vi.fn(), + emitUpdate: (update: Record<string, unknown>, options?: { historical?: boolean }) => { + updateHandler?.(update, options); + }, + setUpdateHandler: ( + handler?: (update: Record<string, unknown>, options?: { historical?: boolean }) => void, + ) => { + updateHandler = handler; + }, + }; +}); vi.mock('@/lib/api', () => ({ request: requestMock, })); vi.mock('@/lib/acp-client', () => ({ - createACPClient: ({ onReadyChange, onStatus }: { onReadyChange?: (ready: boolean) => void; onStatus?: (status: string) => void }) => ({ - start: vi.fn(async () => { - onStatus?.('Connected'); - onReadyChange?.(true); - }), - getConversationId: () => 'conv-1', - getSessionId: () => 'sess-1', - matchesConversation: () => true, - isReady: () => true, - sendPrompt: sendPromptMock, - cancelPrompt: vi.fn(), - dispose: vi.fn(), - }), + extractACPText: (value: unknown): string => { + if (value === null || value === undefined) return ''; + if (typeof value === 'string') return value; + if (Array.isArray(value)) return value.map((item) => String(item ?? '')).join('\n'); + if (typeof value !== 'object') return String(value); + const obj = value as Record<string, unknown>; + if (typeof obj.text === 'string') return obj.text; + if (obj.content) return String(obj.content); + return ''; + }, + createACPClient: ({ + onReadyChange, + onStatus, + onUpdate, + }: { + onReadyChange?: (ready: boolean) => void; + onStatus?: (status: string) => void; + onUpdate?: (update: Record<string, unknown>, options?: { historical?: boolean }) => void; + }) => { + setUpdateHandler(onUpdate); + return { + start: vi.fn(async () => { + onStatus?.('Connected'); + onReadyChange?.(true); + }), + getConversationId: () => 'conv-1', + getSessionId: () => 'sess-1', + matchesConversation: () => true, + isReady: () => true, + sendPrompt: sendPromptMock, + cancelPrompt: vi.fn(), + dispose: vi.fn(() => setUpdateHandler(undefined)), + }; + }, })); vi.mock('@/components/notice-banner', async () => { @@ -67,7 +101,19 @@ vi.mock('@/components/acp/sidebar', () => ({ })); vi.mock('@/components/acp/message', () => ({ - ChatMessage: () => null, + ChatMessage: ({ + message, + }: { + message: { role: string; blocks: Array<{ type: string; text?: string }> }; + }) => ( + <div data-testid="chat-message"> + {message.role}: + {message.blocks + .filter((block) => block.type === 'text') + .map((block) => block.text || '') + .join(' ')} + </div> + ), })); vi.mock('@/components/acp/thinking-block', () => ({ @@ -168,6 +214,7 @@ describe('ChatPage draft persistence', () => { }); requestMock.mockReset(); sendPromptMock.mockReset(); + setUpdateHandler(undefined); sendPromptMock.mockResolvedValue({}); setupRequestMock(); }); @@ -234,6 +281,29 @@ describe('ChatPage draft persistence', () => { expect(localStorage.getItem('spritz:chat-drafts')).toBeNull(); }); + it('renders the echoed ACP user message only once', async () => { + const user = userEvent.setup(); + await renderChat('/c/covo/conv-1'); + + await user.type(screen.getByLabelText('Message input'), 'test'); + await user.click(screen.getByRole('button', { name: 'Send message' })); + + await waitFor(() => expect(sendPromptMock).toHaveBeenCalledWith('test')); + + emitUpdate({ + sessionUpdate: 'user_message_chunk', + messageId: 'user-1', + content: { type: 'text', text: 'test' }, + }); + + await waitFor(() => { + const userMessages = screen + .getAllByTestId('chat-message') + .filter((element) => element.textContent === 'user:test'); + expect(userMessages).toHaveLength(1); + }); + }); + it('restores the original conversation draft when send fails after switching chats', async () => { const user = userEvent.setup(); const deferred = createDeferred<unknown>(); diff --git a/ui/src/pages/chat.tsx b/ui/src/pages/chat.tsx index c020bba..71f4caa 100644 --- a/ui/src/pages/chat.tsx +++ b/ui/src/pages/chat.tsx @@ -414,14 +414,9 @@ export function ChatPage() { const activeSpritzName = activeConversation?.spec?.spritzName || name || ''; const previousComposerText = composerText; - // Optimistically add user message - const t = transcriptRef.current; - t.messages.push({ - role: 'user', - blocks: [{ type: 'text', text }], - streaming: false, - }); - setTranscript({ ...t }); + // ACP owns durable transcript entries, including the echoed user prompt. + // Keep send feedback in ephemeral UI state and wait for ACP to write the + // real message so the transcript cannot diverge or duplicate. // Set title from first message if conversation has no real title const currentTitle = selectedConversationRef.current?.spec?.title || '';