Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cli/src/bin/e2a.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from "../commands/agents.js";
import { inbox } from "../commands/inbox.js";
import { read } from "../commands/read.js";
import { forward } from "../commands/forward.js";
import { reply } from "../commands/reply.js";
import { send } from "../commands/send.js";
import { config } from "../commands/config.js";
Expand Down Expand Up @@ -40,6 +41,7 @@ Usage:
e2a inbox [--unread|--read] [--limit N] [--oldest] [--from substr] [--subject substr] [--conversation id] [--since ts] [--until ts] [--token …] List messages (newest first; --oldest for FIFO)
e2a read <message-id> Read a message
e2a reply <msg-id> --body … [--reply-all] [--cc …] [--bcc …]
e2a forward <msg-id> --to … [--cc …] [--bcc …] [--body …]
e2a send [--to …] [--cc …] [--bcc …] --subject … --body …
e2a listen [options] Listen for emails via WebSocket
e2a domains list List your domains
Expand Down Expand Up @@ -226,6 +228,17 @@ async function main() {
idempotencyKey: getFlag(args, "--idempotency-key"),
});
break;
case "forward":
await forward(args[0], {
to: getFlags(args, "--to"),
cc: getFlags(args, "--cc"),
bcc: getFlags(args, "--bcc"),
body: getFlag(args, "--body"),
htmlBody: getFlag(args, "--html-body"),
from: getFlag(args, "--agent"),
idempotencyKey: getFlag(args, "--idempotency-key"),
});
break;
case "send":
await send(
getFlags(args, "--to"),
Expand Down
44 changes: 44 additions & 0 deletions cli/src/commands/forward.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { createClient } from "../sdk.js";

export async function forward(
messageId: string | undefined,
opts: {
to: string[];
cc?: string[];
bcc?: string[];
body?: string;
htmlBody?: string;
from?: string;
idempotencyKey?: string;
},
): Promise<void> {
if (!messageId) {
process.stderr.write(
"Usage: e2a forward <message-id> --to <addr> [--cc <addr>] [--bcc <addr>] [--body \"...\"] [--html-body \"...\"]\n",
);
process.exit(1);
}
if (!opts.to.length) {
process.stderr.write("--to is required (at least one recipient)\n");
process.exit(1);
}

const client = createClient({ from: opts.from });

if (!client.agentEmail) {
process.stderr.write(
"No agent email configured. Run 'e2a register' first or use --agent.\n",
);
process.exit(1);
}

const res = await client.forward(messageId, opts.to, {
cc: opts.cc?.length ? opts.cc : undefined,
bcc: opts.bcc?.length ? opts.bcc : undefined,
body: opts.body,
htmlBody: opts.htmlBody,
idempotencyKey: opts.idempotencyKey,
});

process.stdout.write(`Sent: ${res.message_id}\n`);
}
206 changes: 203 additions & 3 deletions internal/agent/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (a *API) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/v1/agents/{email}/messages", a.handleGetMessages).Methods("GET")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}", a.handleGetMessage).Methods("GET")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}/reply", a.handleReplyToMessage).Methods("POST")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}/forward", a.handleForwardMessage).Methods("POST")
r.HandleFunc("/api/v1/domains", a.handleListDomains).Methods("GET")
r.HandleFunc("/api/v1/domains", a.handleRegisterDomain).Methods("POST")
r.HandleFunc("/api/v1/domains/{domain}/verify", a.handleVerifyDomain).Methods("POST")
Expand Down Expand Up @@ -1441,7 +1442,7 @@ func (a *API) domainInfoFromRecord(d *identity.Domain) DomainInfo {
// handleSendTestEmail when agent.HITLEnabled is true.
//
// replyToEmailMessageID is the inbound Message-ID being replied to, or "".
// msgType is one of "send", "reply", or "test".
// msgType is one of "send", "reply", "test", or "forward".
func (a *API) holdForApproval(w http.ResponseWriter, r *http.Request, agent *identity.AgentIdentity, req outbound.SendRequest, msgType, replyToEmailMessageID string) {
var attachmentsJSON []byte
if len(req.Attachments) > 0 {
Expand Down Expand Up @@ -1635,7 +1636,7 @@ func (a *API) handleSendEmail(w http.ResponseWriter, r *http.Request) {
}

if selfSend {
providerID, err := a.performSelfSend(r.Context(), agent, req)
providerID, err := a.performSelfSend(r.Context(), agent, req, "send")
if err != nil {
log.Printf("[api] self-send failed: agent=%s error=%v", agent.EmailAddress(), err)
http.Error(w, "self-send failed", http.StatusInternalServerError)
Expand Down Expand Up @@ -1972,7 +1973,7 @@ func (a *API) handleReplyToMessage(w http.ResponseWriter, r *http.Request) {
}

if selfReply {
providerID, err := a.performSelfSend(r.Context(), agent, sendReq)
providerID, err := a.performSelfSend(r.Context(), agent, sendReq, "reply")
if err != nil {
log.Printf("[api] self-reply failed: agent=%s error=%v", agent.EmailAddress(), err)
http.Error(w, "self-reply failed", http.StatusInternalServerError)
Expand Down Expand Up @@ -2024,6 +2025,205 @@ func (a *API) handleReplyToMessage(w http.ResponseWriter, r *http.Request) {
})
}

// ForwardRequest is the JSON body for /api/v1/agents/{email}/messages/{id}/forward.
type ForwardRequest struct {
To []string `json:"to"`
CC []string `json:"cc,omitempty"`
BCC []string `json:"bcc,omitempty"`
Body string `json:"body,omitempty"`
HTMLBody string `json:"html_body,omitempty"`
ConversationID string `json:"conversation_id,omitempty"`
Attachments []outbound.Attachment `json:"attachments,omitempty"`
}

// handleForwardMessage forwards a previously received email to new recipients.
// @Summary Forward an inbound email
// @Description Forward a previously received email to one or more new recipients. The server prepends the caller's optional comment, then a Gmail-style "Forwarded message" block with the original headers and best-effort extracted body. A forward is treated as a NEW thread — no In-Reply-To/References headers are emitted; pass conversation_id to bind it to an existing thread explicitly. Rate limited to 60 sends per agent per minute; 429 responses carry a `Retry-After` header in delay-seconds form. When the owning agent has HITL enabled, the server returns 202 Accepted with status="pending_approval".
// @Tags Email
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param email path string true "Agent email address" example(my-bot@example.com)
// @Param id path string true "Message ID from the inbound payload" example(msg_abc123)
// @Param request body ForwardMessageRequest true "Forward content"
// @Success 200 {object} SendEmailResponse "Forward sent immediately"
// @Success 202 {object} SendEmailResponse "Forward held for human approval"
// @Failure 400 {string} string "Missing or invalid fields"
// @Failure 401 {string} string "Missing or invalid API key"
// @Failure 403 {string} string "Agent domain not verified"
// @Failure 404 {string} string "Message not found or does not belong to this agent"
// @Failure 409 {string} string "Another request with this Idempotency-Key is in progress"
// @Failure 422 {string} string "Idempotency-Key reused with a different request body"
// @Failure 429 {string} string "Rate limit exceeded"
// @Param Idempotency-Key header string false "Caller-generated unique key (recommend UUIDv4). Retries with the same key + same body replay the original response; with a different body return 422."
// @Router /api/v1/agents/{email}/messages/{id}/forward [post]
func (a *API) handleForwardMessage(w http.ResponseWriter, r *http.Request) {
user, err := a.authenticateUser(r)
if err != nil {
a.writeAuthError(w, r, err)
return
}

bodyBytes, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maxRequestBytesSend))
if err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}

replayed, captureW, finalize := a.idempotencyGuard(w, r, user.ID, bodyBytes)
if replayed {
return
}
defer finalize()
w = captureW

vars := mux.Vars(r)
email := normalizeEmail(vars["email"])
msgID := vars["id"]

agent, err := a.resolveAgentForUser(r, email, user)
if err != nil {
http.Error(w, "agent not found", http.StatusNotFound)
return
}

inbound, err := a.store.GetInboundMessage(r.Context(), msgID)
if err != nil {
http.Error(w, "message not found", http.StatusNotFound)
return
}
if inbound.AgentID != agent.ID {
http.Error(w, "message not found", http.StatusNotFound)
return
}

if ok, retryAfter := a.sendLimit.AllowWithRetryAfter(agent.ID); !ok {
writeTooManyRequests(w, retryAfter, "rate limit exceeded — max 60 sends per minute per agent")
return
}

if !agent.DomainVerified {
http.Error(w, "agent domain must be verified before sending", http.StatusForbidden)
return
}

if a.enforcer != nil {
if err := a.enforcer.CheckMessageSend(r.Context(), user.ID); err != nil {
if limits.WriteLimitError(w, err) {
return
}
log.Printf("[api] limits.CheckMessageSend error: %v", err)
http.Error(w, "limits check failed", http.StatusInternalServerError)
return
}
}

var req ForwardRequest
if err := json.Unmarshal(bodyBytes, &req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
if len(req.To) == 0 && len(req.CC) == 0 {
http.Error(w, "at least one recipient in to or cc is required", http.StatusBadRequest)
return
}
if err := validateRecipients(req.To, req.CC, req.BCC); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := validateConversationID(req.ConversationID); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

subject := outbound.BuildForwardSubject(inbound.Subject)
fwdCtx := outbound.ExtractForwardContext(inbound.RawMessage)
composedBody := outbound.BuildForwardBody(req.Body, fwdCtx)
var composedHTML string
if req.HTMLBody != "" || fwdCtx.HTML != "" || fwdCtx.Text != "" {
composedHTML = outbound.BuildForwardHTMLBody(req.HTMLBody, fwdCtx)
}

sendReq := outbound.SendRequest{
To: req.To,
CC: req.CC,
BCC: req.BCC,
Subject: subject,
Body: composedBody,
HTMLBody: composedHTML,
ConversationID: req.ConversationID,
Attachments: req.Attachments,
}

// Pre-clean self-aliases so isSelfSend sees a true self-loop when
// the caller forwarded a message to the agent's own address.
sendReq.CC = stripAgentSelfAliases(sendReq.CC, agent.EmailAddress())
sendReq.BCC = stripAgentSelfAliases(sendReq.BCC, agent.EmailAddress())
selfForward := isSelfSend(sendReq, agent.EmailAddress())

if agent.HITLEnabled {
// inbound.EmailMessageID is persisted so the review panel can
// attach the InboundContext pane. buildSendRequestFromMessage
// gates ReplyToMessageID on type="reply", so this won't be
// promoted to a threading header on approval.
a.holdForApproval(w, r, agent, sendReq, "forward", inbound.EmailMessageID)
return
}

if _, err := a.usage.RecordAndCheck(r.Context(), user.ID, agent.ID, agent.Domain, "outbound"); err != nil {
log.Printf("[api] usage recording error: %v", err)
}

if selfForward {
providerID, err := a.performSelfSend(r.Context(), agent, sendReq, "forward")
if err != nil {
log.Printf("[api] self-forward failed: agent=%s error=%v", agent.EmailAddress(), err)
http.Error(w, "self-forward failed", http.StatusInternalServerError)
return
}
markSideEffectCommitted(w)
slug, _, _ := strings.Cut(agent.EmailAddress(), "@")
log.Printf("[mail] dir=outbound type=forward method=loopback from=%s to=%s slug=%s conv_id=%s subject=%q provider_id=%s orig=%s",
agent.EmailAddress(), agent.EmailAddress(), slug, req.ConversationID, subject, providerID, msgID)
w.Header().Set("Content-Type", "application/json")
writeJSON(w, map[string]string{
"status": "sent",
"message_id": providerID,
"method": "loopback",
})
return
}

result, err := a.sender.Send(agent, sendReq)
if err != nil {
if outbound.IsValidationError(err) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Printf("[api] forward failed: agent=%s to=%v error=%v", agent.Domain, req.To, err)
http.Error(w, fmt.Sprintf("delivery failed: %v", err), http.StatusInternalServerError)
return
}
markSideEffectCommitted(w)

outMsg, err := a.store.CreateOutboundMessage(r.Context(), agent.ID, result.To, result.CC, result.BCC, subject, "forward", result.Method, result.MessageID, req.ConversationID)
if err != nil {
log.Printf("[api] failed to record outbound message: %v", err)
}

slug, _, _ := strings.Cut(agent.EmailAddress(), "@")
if outMsg != nil {
log.Printf("[mail:%s] dir=outbound type=forward from=%s to=%v slug=%s conv_id=%s subject=%q orig=%s", outMsg.ID, agent.EmailAddress(), result.To, slug, req.ConversationID, subject, msgID)
}

w.Header().Set("Content-Type", "application/json")
writeJSON(w, map[string]string{
"status": "sent",
"message_id": result.MessageID,
"method": result.Method,
})
}

// --- Polling API ---

// handleGetMessages lists messages for an agent.
Expand Down
17 changes: 16 additions & 1 deletion internal/agent/api_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ type ReplyToMessageRequest struct {
Attachments []Attachment `json:"attachments,omitempty"`
} // @name ReplyToMessageRequest

// ForwardMessageRequest is the request body for forwarding a message.
// Body and html_body are the caller's optional comment to prepend; the
// server appends a quoted block with the original headers and body. A
// forward is treated as a new thread (no In-Reply-To/References) — pass
// conversation_id to bind it to an existing thread explicitly.
type ForwardMessageRequest struct {
To []string `json:"to" example:"alice@example.com"`
CC []string `json:"cc,omitempty" example:"bob@example.com"`
BCC []string `json:"bcc,omitempty" example:"carol@example.com"`
Body string `json:"body,omitempty" example:"FYI — see below"`
HTMLBody string `json:"html_body,omitempty" example:"<p>FYI — see below</p>"`
ConversationID string `json:"conversation_id,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
} // @name ForwardMessageRequest

// ListMessagesResponse wraps the message list with pagination.
type ListMessagesResponse struct {
Messages []MessageSummary `json:"messages"`
Expand Down Expand Up @@ -323,7 +338,7 @@ type PendingMessageSummary struct {
AgentID string `json:"agent_id" example:"my-bot@example.com"`
Direction string `json:"direction" example:"outbound"`
Subject string `json:"subject" example:"Re: contract details"`
Type string `json:"type,omitempty" example:"send" enums:"send,reply,test"`
Type string `json:"type,omitempty" example:"send" enums:"send,reply,test,forward"`
ConversationID string `json:"conversation_id,omitempty"`
To []string `json:"to" example:"alice@example.com"`
CC []string `json:"cc,omitempty"`
Expand Down
Loading
Loading