Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 162 additions & 71 deletions cmd/aileron-mcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
// execution. This is the "MCP canonical" path for action exposure
// ratified during the working session on 2026-05-03.
//
// When AILERON_COMMS_SOCKET is set (e.g. when launched by `aileron
// launch`), the server additionally exposes comms tools — read_messages,
// send_message, draft_reply, http_request — that talk to the launch
// product's CommsServer over a Unix socket for Slack/Discord inbound
// push handling.
// When AILERON_COMMS_URL + AILERON_SESSION_ID are set (e.g. when
// launched by `aileron launch`), the server additionally exposes comms
// tools — read_messages, send_message, draft_reply, http_request — that
// reach the daemon-owned comms surface via HTTP. Pre-9B these talked to
// a per-session unix socket; ADR-0012 step 9B-2 moved comms ownership
// to the daemon and switched the wire to HTTP long-poll.
//
// The binary communicates over stdio using JSON-RPC 2.0, per the MCP
// specification.
Expand All @@ -23,8 +24,11 @@
// When set, action tools are discovered and exposed.
// AILERON_TOKEN - Optional bearer token for authenticating with
// the Aileron API.
// AILERON_COMMS_SOCKET - Path to the launch product's comms Unix
// socket. When set, comms tools are exposed.
// AILERON_COMMS_URL - URL of the daemon's comms surface (typically
// the same as AILERON_URL). Pair with
// AILERON_SESSION_ID to enable comms tools.
// AILERON_SESSION_ID - The launch session id the daemon stamps on
// comms approval entries.
package main

import (
Expand All @@ -35,12 +39,13 @@ import (
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/ALRubinger/aileron/internal/config"
"github.com/ALRubinger/aileron/internal/observability"
Expand Down Expand Up @@ -166,8 +171,14 @@ type actionRunResponse struct {
type server struct {
aileronURL string
aileronToken string
commsSocket string
commsURL string
sessionID string
httpClient *http.Client
// commsHTTPClient is a long-poll-tolerant client for the comms
// endpoints — daemon caps its waits at the action-approval TTL
// (5 min default). A dedicated client lets the action-discovery
// path use a tighter timeout without affecting comms.
commsHTTPClient *http.Client

// Discovered actions, populated at startup when AILERON_URL is set.
// Keys of actionNameMap are snake_case (LLM-facing) tool names;
Expand All @@ -176,6 +187,14 @@ type server struct {
actionNameMap map[string]string
}

// commsAvailable reports whether the env carries enough context to
// reach the daemon's comms endpoints. Both env vars must be set; a
// missing session id yields a 404 from the daemon, so fail-loud with
// "comms not available" beats a confusing 404.
func (s *server) commsAvailable() bool {
return s.commsURL != "" && s.sessionID != ""
}

var readMessagesTool = toolDef{
Name: "read_messages",
Description: "Read pending messages from communication channels (Slack, Discord). Returns unread messages from the notification queue. Messages with draft_request=true need a reply drafted — call draft_reply with the message ID and your suggested reply.",
Expand Down Expand Up @@ -249,8 +268,13 @@ func main() {
s := &server{
aileronURL: os.Getenv("AILERON_URL"),
aileronToken: os.Getenv("AILERON_TOKEN"),
commsSocket: os.Getenv("AILERON_COMMS_SOCKET"),
httpClient: &http.Client{},
commsURL: os.Getenv("AILERON_COMMS_URL"),
sessionID: os.Getenv("AILERON_SESSION_ID"),
httpClient: &http.Client{Timeout: 30 * time.Second},
// 6-minute deadline matches the daemon's 5-minute approval TTL
// + a small buffer so the daemon's bounded response always
// wins over a transport timeout.
commsHTTPClient: &http.Client{Timeout: 6 * time.Minute},
}

// Discover installed actions from the Aileron daemon. Best-effort:
Expand Down Expand Up @@ -354,7 +378,7 @@ func (s *server) handle(req jsonrpcRequest) *jsonrpcResponse {

func (s *server) availableTools() []toolDef {
var tools []toolDef
if s.commsSocket != "" {
if s.commsAvailable() {
tools = append(tools, readMessagesTool, draftReplyTool, sendMessageTool, httpRequestTool)
}
// Dynamically discovered Aileron actions from the daemon's
Expand Down Expand Up @@ -386,26 +410,34 @@ func (s *server) dispatchTool(ctx context.Context, name string, args map[string]
}

func (s *server) readMessages(args map[string]any) toolResult {
if s.commsSocket == "" {
if !s.commsAvailable() {
return errorResult("comms not available (not launched via aileron)")
}

service, _ := args["service"].(string)
channel, _ := args["channel"].(string)

resp := requestComms(s.commsSocket, commsRequest{
Method: "read_messages",
Service: service,
Channel: channel,
})
if resp.Error != "" {
return errorResult(resp.Error)
endpoint := s.commsEndpoint("messages")
q := url.Values{}
if service != "" {
q.Set("service", service)
}
if channel != "" {
q.Set("channel", channel)
}
if encoded := q.Encode(); encoded != "" {
endpoint += "?" + encoded
}

var resp readMessagesResponse
if err := s.commsGET(endpoint, &resp); err != nil {
return errorResult(err.Error())
}
return jsonResult(resp.Messages)
}

func (s *server) draftReply(args map[string]any) toolResult {
if s.commsSocket == "" {
if !s.commsAvailable() {
return errorResult("comms not available (not launched via aileron)")
}

Expand All @@ -416,12 +448,14 @@ func (s *server) draftReply(args map[string]any) toolResult {
return errorResult("message_id and body are required")
}

resp := requestComms(s.commsSocket, commsRequest{
Method: "draft_reply",
ReplyTo: messageID,
Body: body,
resp, err := s.commsPOST(s.commsEndpoint("draft"), map[string]string{
"reply_to": messageID,
"body": body,
})
if resp.Error != "" {
if err != nil {
return errorResult(err.Error())
}
if !resp.OK {
return errorResult(resp.Error)
}
return toolResult{
Expand All @@ -430,7 +464,7 @@ func (s *server) draftReply(args map[string]any) toolResult {
}

func (s *server) sendMessage(args map[string]any) toolResult {
if s.commsSocket == "" {
if !s.commsAvailable() {
return errorResult("comms not available (not launched via aileron)")
}

Expand All @@ -442,13 +476,15 @@ func (s *server) sendMessage(args map[string]any) toolResult {
return errorResult("service, channel, and body are required")
}

resp := requestComms(s.commsSocket, commsRequest{
Method: "send_message",
Service: service,
Channel: channel,
Body: body,
resp, err := s.commsPOST(s.commsEndpoint("send"), map[string]string{
"service": service,
"channel": channel,
"body": body,
})
if resp.Error != "" {
if err != nil {
return errorResult(err.Error())
}
if !resp.OK {
return errorResult(resp.Error)
}
return toolResult{
Expand All @@ -457,7 +493,7 @@ func (s *server) sendMessage(args map[string]any) toolResult {
}

func (s *server) httpRequest(args map[string]any) toolResult {
if s.commsSocket == "" {
if !s.commsAvailable() {
return errorResult("comms not available (not launched via aileron)")
}

Expand All @@ -470,17 +506,26 @@ func (s *server) httpRequest(args map[string]any) toolResult {
return errorResult("method and url are required")
}

resp := requestComms(s.commsSocket, commsRequest{
Method: "http_request",
Service: method, // repurpose Service field for HTTP method
Channel: url, // repurpose Channel field for URL
Body: body,
ReplyTo: headers, // repurpose ReplyTo field for headers JSON
})
if resp.Error != "" {
payload := map[string]string{
"method": method,
"url": url,
}
if body != "" {
payload["body"] = body
}
if headers != "" {
payload["headers"] = headers
}

resp, err := s.commsPOST(s.commsEndpoint("http"), payload)
if err != nil {
return errorResult(err.Error())
}
if !resp.OK {
return errorResult(resp.Error)
}
// Response body is in the first message's Body field.
// Response body is in the first message's Body field; the daemon
// stamps the upstream HTTP status code into Id.
if len(resp.Messages) > 0 {
return toolResult{
Content: []toolContent{{Type: "text", Text: resp.Messages[0].Body}},
Expand All @@ -491,47 +536,93 @@ func (s *server) httpRequest(args map[string]any) toolResult {
}
}

// --- Comms IPC types (mirrors core/launch/commsserver.go) ---

type commsRequest struct {
Method string `json:"method"`
Service string `json:"service,omitempty"`
Channel string `json:"channel,omitempty"`
Body string `json:"body,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
}
// --- Comms wire shapes (mirrors internal/api/openapi.yaml CommsToolResponse) ---

type commsResponse struct {
type commsToolResponse struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Messages []commsMessage `json:"messages,omitempty"`
}

type commsMessage struct {
ID string `json:"id"`
Service string `json:"service"`
Channel string `json:"channel"`
Author string `json:"author"`
Body string `json:"body"`
Timestamp string `json:"timestamp"`
type readMessagesResponse struct {
Messages []commsMessage `json:"messages"`
}

func requestComms(socketPath string, req commsRequest) commsResponse {
conn, err := net.Dial("unix", socketPath)
type commsMessage struct {
ID string `json:"id"`
Service string `json:"service"`
Channel string `json:"channel"`
Author string `json:"author"`
Body string `json:"body"`
Timestamp string `json:"timestamp"`
DraftRequest bool `json:"draft_request,omitempty"`
}

// commsEndpoint composes the daemon's per-session comms URL for the
// given suffix ("messages", "send", "draft", "http"). The daemon
// expects `/v1/sessions/{sessionID}/comms/<suffix>`.
func (s *server) commsEndpoint(suffix string) string {
return strings.TrimRight(s.commsURL, "/") + "/v1/sessions/" + s.sessionID + "/comms/" + suffix
}

// commsGET issues a GET against the daemon's comms surface and decodes
// the JSON body into out. Any non-200 status, transport error, or
// decode failure surfaces as an error so the agent sees the failure
// rather than silently dropping the call.
func (s *server) commsGET(endpoint string, out any) error {
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return commsResponse{Error: "connection failed: " + err.Error()}
return fmt.Errorf("comms request: %w", err)
}
defer conn.Close()

if err := json.NewEncoder(conn).Encode(req); err != nil {
return commsResponse{Error: "encode failed: " + err.Error()}
if s.aileronToken != "" {
req.Header.Set("Authorization", "Bearer "+s.aileronToken)
}
resp, err := s.commsHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("daemon unreachable: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("daemon returned %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
return fmt.Errorf("decoding response: %w", err)
}
return nil
}

var resp commsResponse
if err := json.NewDecoder(conn).Decode(&resp); err != nil {
return commsResponse{Error: "decode failed: " + err.Error()}
// commsPOST issues a POST with the JSON-encoded body and returns the
// parsed CommsToolResponse. The daemon's send-shaped endpoints always
// return 200 — the verdict rides in `ok` + `error` — so a non-200
// here means the call never reached the queue.
func (s *server) commsPOST(endpoint string, body any) (commsToolResponse, error) {
data, err := json.Marshal(body)
if err != nil {
return commsToolResponse{}, fmt.Errorf("encoding request: %w", err)
}
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(data))
if err != nil {
return commsToolResponse{}, fmt.Errorf("comms request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if s.aileronToken != "" {
req.Header.Set("Authorization", "Bearer "+s.aileronToken)
}
resp, err := s.commsHTTPClient.Do(req)
if err != nil {
return commsToolResponse{}, fmt.Errorf("daemon unreachable: %w", err)
}
defer resp.Body.Close()
rawBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if resp.StatusCode != http.StatusOK {
return commsToolResponse{}, fmt.Errorf("daemon returned %s: %s", resp.Status, strings.TrimSpace(string(rawBody)))
}
var out commsToolResponse
if err := json.Unmarshal(rawBody, &out); err != nil {
return commsToolResponse{}, fmt.Errorf("decoding response: %w", err)
}
return resp
return out, nil
}

// --- Action discovery / execution against the Aileron daemon ---
Expand Down
Loading
Loading