Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ Spritz is intended to remain portable and standalone:
- [ACP Port and Agent Chat Architecture](docs/2026-03-09-acp-port-and-agent-chat-architecture.md)
- [External Provisioner and Service Principal Architecture](docs/2026-03-11-external-provisioner-and-service-principal-architecture.md)
- [External Identity Resolution API Architecture](docs/2026-03-12-external-identity-resolution-api-architecture.md)
- [Privileged Conversation Debug and Test Architecture](docs/2026-03-24-privileged-conversation-debug-and-test-architecture.md)
- [Spritz-Native Conversation Broker Architecture](docs/2026-03-24-spritz-native-conversation-broker-architecture.md)
- [Spawn Language for Agent Instances](docs/2026-03-13-spawn-language-for-agent-instances.md)
- [OpenClaw Integration](docs/2026-03-13-openclaw-integration.md)
- [Local kind Development Guide](docs/2026-03-14-local-kind-development-guide.md)
Expand Down
109 changes: 95 additions & 14 deletions api/acp_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"

"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -103,8 +104,12 @@ func (e *acpBootstrapRPCError) missingSession() bool {
}

type acpBootstrapInstanceClient struct {
conn *websocket.Conn
nextID int64
conn *websocket.Conn
nextID int64
writeMu sync.Mutex
readerOnce sync.Once
readCh chan *acpBootstrapJSONRPCMessage
readErrCh chan error
}

func (c *acpBootstrapInstanceClient) close() error {
Expand Down Expand Up @@ -190,6 +195,8 @@ func (c *acpBootstrapInstanceClient) request(ctx context.Context, method string,
}

func (c *acpBootstrapInstanceClient) writeJSON(ctx context.Context, payload any) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return err
Expand All @@ -198,21 +205,90 @@ func (c *acpBootstrapInstanceClient) writeJSON(ctx context.Context, payload any)
return c.conn.WriteJSON(payload)
}

func (c *acpBootstrapInstanceClient) notify(ctx context.Context, method string, params any) error {
return c.writeJSON(ctx, map[string]any{
"jsonrpc": "2.0",
"method": method,
"params": params,
})
}

func (c *acpBootstrapInstanceClient) cancelPrompt(ctx context.Context, sessionID string) error {
if strings.TrimSpace(sessionID) == "" {
return nil
}
return c.notify(ctx, "session/cancel", map[string]any{
"sessionId": sessionID,
})
}

func (c *acpBootstrapInstanceClient) respondError(ctx context.Context, id any, code int, message string) error {
if id == nil {
return nil
}
return c.writeJSON(ctx, map[string]any{
"jsonrpc": "2.0",
"id": id,
"error": map[string]any{
"code": code,
"message": message,
},
})
}

func (c *acpBootstrapInstanceClient) handleServerRequest(ctx context.Context, message *acpBootstrapJSONRPCMessage, unsupportedMessage string) (bool, error) {
if message == nil || strings.TrimSpace(message.Method) == "" || message.ID == nil {
return false, nil
}
if message.Method == "session/request_permission" {
return true, c.respondError(ctx, message.ID, -32000, "Permission requests are not supported by internal debug chat.")
}
return true, c.respondError(ctx, message.ID, -32601, unsupportedMessage)
}

func (c *acpBootstrapInstanceClient) readMessage(ctx context.Context) (*acpBootstrapJSONRPCMessage, error) {
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetReadDeadline(deadline); err != nil {
c.startReader()
select {
case <-ctx.Done():
return nil, ctx.Err()
case message, ok := <-c.readCh:
if !ok {
if err, ok := <-c.readErrCh; ok && err != nil {
return nil, err
}
return nil, websocket.ErrCloseSent
}
return message, nil
case err, ok := <-c.readErrCh:
if ok && err != nil {
return nil, err
}
return nil, websocket.ErrCloseSent
}
_, payload, err := c.conn.ReadMessage()
if err != nil {
return nil, err
}
message := &acpBootstrapJSONRPCMessage{}
if err := json.Unmarshal(payload, message); err != nil {
return nil, err
}
return message, nil
}

func (c *acpBootstrapInstanceClient) startReader() {
c.readerOnce.Do(func() {
c.readCh = make(chan *acpBootstrapJSONRPCMessage, 32)
c.readErrCh = make(chan error, 1)
go func() {
defer close(c.readCh)
defer close(c.readErrCh)
for {
_, payload, err := c.conn.ReadMessage()
if err != nil {
c.readErrCh <- err
return
}
message := &acpBootstrapJSONRPCMessage{}
if err := json.Unmarshal(payload, message); err != nil {
c.readErrCh <- err
return
}
c.readCh <- message
}
}()
})
}

func newACPBootstrapRPCError(payload *acpBootstrapJSONRPCError) *acpBootstrapRPCError {
Expand Down Expand Up @@ -344,8 +420,12 @@ func (s *server) bootstrapACPConversationBinding(ctx context.Context, conversati
return nil, err
}

return s.bootstrapACPConversationBindingWithClient(ctx, conversation, client, initResult)
}

func (s *server) bootstrapACPConversationBindingWithClient(ctx context.Context, conversation *spritzv1.SpritzConversation, client *acpBootstrapInstanceClient, initResult *acpBootstrapInitializeResult) (*acpBootstrapResponse, error) {
if !initResult.AgentCapabilities.LoadSession {
err = errors.New("agent does not support session/load")
err := errors.New("agent does not support session/load")
s.recordConversationBindingError(ctx, conversation.Namespace, conversation.Name, "", err)
return nil, err
}
Expand All @@ -358,6 +438,7 @@ func (s *server) bootstrapACPConversationBinding(ctx context.Context, conversati
replaced := false
loaded := false
var replayMessageCount int32
var err error

if effectiveSessionID != "" {
replayMessageCount, err = client.loadSession(ctx, effectiveSessionID, normalizeConversationCWD(conversation.Spec.CWD))
Expand Down
4 changes: 4 additions & 0 deletions api/acp_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type acpConfig struct {
clientInfo acpBootstrapClientInfo
clientCapabilities map[string]any
bootstrapDialTimeout time.Duration
promptTimeout time.Duration
promptSettleTimeout time.Duration
}

func defaultACPClientCapabilities() map[string]any {
Expand Down Expand Up @@ -65,6 +67,8 @@ func newACPConfig() acpConfig {
},
clientCapabilities: defaultACPClientCapabilities(),
bootstrapDialTimeout: parseDurationEnv("SPRITZ_ACP_BOOTSTRAP_DIAL_TIMEOUT", 5*time.Second),
promptTimeout: parseDurationEnv("SPRITZ_ACP_PROMPT_TIMEOUT", 90*time.Second),
promptSettleTimeout: parseDurationEnv("SPRITZ_ACP_PROMPT_SETTLE_TIMEOUT", 750*time.Millisecond),
}
}

Expand Down
205 changes: 205 additions & 0 deletions api/acp_prompt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)

type acpPromptResult struct {
StopReason string `json:"stopReason,omitempty"`
AssistantText string `json:"assistantText,omitempty"`
Updates []map[string]any `json:"updates,omitempty"`
}

func (c *acpBootstrapInstanceClient) prompt(ctx context.Context, sessionID, text string, settleTimeout time.Duration) (*acpPromptResult, error) {
c.nextID++
requestID := fmt.Sprintf("prompt-%d", c.nextID)
if err := c.writeJSON(ctx, map[string]any{
"jsonrpc": "2.0",
"id": requestID,
"method": "session/prompt",
"params": map[string]any{
"sessionId": sessionID,
"prompt": []map[string]any{{
"type": "text",
"text": text,
}},
},
}); err != nil {
return nil, err
}

var result struct {
StopReason string `json:"stopReason"`
}
updates := make([]map[string]any, 0, 8)

for {
message, err := c.readMessage(ctx)
if err != nil {
return nil, err
}
if handled, err := c.handleServerRequest(ctx, message, "Method not supported by internal debug chat."); handled {
if err != nil {
return nil, err
}
continue
}
if update, ok := sessionUpdateFromMessage(message); ok {
updates = append(updates, update)
continue
}
if fmt.Sprint(message.ID) != requestID {
continue
}
if message.Error != nil {
return nil, newACPBootstrapRPCError(message.Error)
}
if len(message.Result) > 0 {
if err := json.Unmarshal(message.Result, &result); err != nil {
return nil, err
}
}
break
}

if err := c.drainSessionUpdates(ctx, settleTimeout, &updates); err != nil {
return nil, err
}

return &acpPromptResult{
StopReason: strings.TrimSpace(result.StopReason),
AssistantText: assistantTextFromACPUpdates(updates),
Updates: updates,
}, nil
}

func (c *acpBootstrapInstanceClient) drainSessionUpdates(ctx context.Context, settleTimeout time.Duration, updates *[]map[string]any) error {
if c == nil || c.conn == nil || settleTimeout <= 0 {
return nil
}
c.startReader()
timer := time.NewTimer(settleTimeout)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
case err, ok := <-c.readErrCh:
if !ok || err == nil {
return nil
}
return err
case message, ok := <-c.readCh:
if !ok {
return nil
}
if handled, err := c.handleServerRequest(ctx, message, "Method not supported by internal debug chat."); handled {
if err != nil {
return err
}
continue
}
if update, ok := sessionUpdateFromMessage(message); ok {
*updates = append(*updates, update)
if !shouldExtendSessionSettleWindow(update) {
continue
}
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(settleTimeout)
}
}
}

func shouldExtendSessionSettleWindow(update map[string]any) bool {
switch strings.TrimSpace(fmt.Sprint(update["sessionUpdate"])) {
case "", "heartbeat", "ping", "pong", "ack", "available_commands_update", "current_mode_update", "usage_update", "session_info_update":
return false
default:
return true
}
}

func sessionUpdateFromMessage(message *acpBootstrapJSONRPCMessage) (map[string]any, bool) {
if message == nil || message.Method != "session/update" || len(message.Params) == 0 {
return nil, false
}
var params struct {
Update map[string]any `json:"update"`
}
if err := json.Unmarshal(message.Params, &params); err != nil {
return nil, false
}
if len(params.Update) == 0 {
return nil, false
}
return params.Update, true
}

func assistantTextFromACPUpdates(updates []map[string]any) string {
chunks := make([]any, 0, len(updates))
for _, update := range updates {
if strings.TrimSpace(fmt.Sprint(update["sessionUpdate"])) != "agent_message_chunk" {
continue
}
chunks = append(chunks, update["content"])
}
return joinACPTextChunks(chunks)
}

func joinACPTextChunks(values []any) string {
var builder strings.Builder
for _, value := range values {
builder.WriteString(extractACPText(value))
}
return builder.String()
}

func extractACPText(value any) string {
switch typed := value.(type) {
case nil:
return ""
case string:
return typed
case []any:
parts := make([]string, 0, len(typed))
for _, item := range typed {
text := extractACPText(item)
if text == "" {
continue
}
parts = append(parts, text)
}
return strings.Join(parts, "\n")
case map[string]any:
if text, ok := typed["text"].(string); ok {
return text
}
if content, ok := typed["content"]; ok {
return extractACPText(content)
}
if resource, ok := typed["resource"].(map[string]any); ok {
if text, ok := resource["text"].(string); ok {
return text
}
if uri, ok := resource["uri"].(string); ok {
return uri
}
}
return ""
default:
return fmt.Sprint(typed)
}
}
2 changes: 2 additions & 0 deletions api/acp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func newACPTestServer(t *testing.T, objects ...client.Object) *server {
path: spritzv1.DefaultACPPath,
clientCapabilities: defaultACPClientCapabilities(),
bootstrapDialTimeout: 5 * time.Second,
promptTimeout: 30 * time.Second,
promptSettleTimeout: 10 * time.Millisecond,
},
}
}
Expand Down
Loading
Loading