Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cli/src/bin/e2a.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { inbox } from "../commands/inbox.js";
import { read } from "../commands/read.js";
import { forward } from "../commands/forward.js";
import { labels } from "../commands/labels.js";
import { reply } from "../commands/reply.js";
import { send } from "../commands/send.js";
import { config } from "../commands/config.js";
Expand Down Expand Up @@ -42,6 +43,7 @@ Usage:
e2a read <message-id> Read a message
e2a reply <msg-id> --body … [--reply-all] [--cc …] [--bcc …]
e2a forward <msg-id> --to … [--cc …] [--bcc …] [--body …]
e2a labels <msg-id> [--add <label> …] [--remove <label> …]
e2a send [--to …] [--cc …] [--bcc …] --subject … --body …
e2a listen [options] Listen for emails via WebSocket
e2a domains list List your domains
Expand Down Expand Up @@ -239,6 +241,13 @@ async function main() {
idempotencyKey: getFlag(args, "--idempotency-key"),
});
break;
case "labels":
await labels(args[0], {
add: getFlags(args, "--add"),
remove: getFlags(args, "--remove"),
from: getFlag(args, "--agent"),
});
break;
case "send":
await send(
getFlags(args, "--to"),
Expand Down
37 changes: 37 additions & 0 deletions cli/src/commands/labels.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { createClient } from "../sdk.js";

export async function labels(
messageId: string | undefined,
opts: {
add: string[];
remove: string[];
from?: string;
},
): Promise<void> {
if (!messageId) {
process.stderr.write(
"Usage: e2a labels <message-id> [--add <label> ...] [--remove <label> ...]\n",
);
process.exit(1);
}
if (opts.add.length === 0 && opts.remove.length === 0) {
process.stderr.write("at least one --add or --remove is required\n");
process.exit(1);
}

const client = createClient({ from: opts.from });

if (!client.agentEmail) {
process.stderr.write(
"No agent email configured. Run 'e2a register' first or use --agent.\n",
);
process.exit(1);
}

const res = await client.updateMessageLabels(messageId, {
addLabels: opts.add.length ? opts.add : undefined,
removeLabels: opts.remove.length ? opts.remove : undefined,
});

process.stdout.write(`${res.message_id}: ${(res.labels ?? []).join(", ") || "(none)"}\n`);
}
233 changes: 232 additions & 1 deletion internal/agent/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func (a *API) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/v1/send", a.handleSendEmail).Methods("POST")
r.HandleFunc("/api/v1/agents/{email}/messages", a.handleGetMessages).Methods("GET")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}", a.handleGetMessage).Methods("GET")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}", a.handleUpdateMessage).Methods("PATCH")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}/reply", a.handleReplyToMessage).Methods("POST")
r.HandleFunc("/api/v1/agents/{email}/messages/{id}/forward", a.handleForwardMessage).Methods("POST")
r.HandleFunc("/api/v1/domains", a.handleListDomains).Methods("GET")
Expand Down Expand Up @@ -2343,6 +2344,37 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
http.Error(w, "conversation_id too long", http.StatusBadRequest)
return
}
// labels filter accepts repeated `?labels=foo&labels=bar` query
// params and AND-matches them server-side. Each value is validated
// against the same charset as writes so the GIN-indexed lookup
// never sees a malformed value. The cap on the slice prevents a
// 1000-label query from melting the index.
rawLabels := r.URL.Query()["labels"]
if len(rawLabels) > MaxLabelsPerOp {
http.Error(w, fmt.Sprintf("labels filter exceeds cap of %d", MaxLabelsPerOp), http.StatusBadRequest)
return
}
var labelsFilter []string
if len(rawLabels) > 0 {
// `allowSystemPrefix=true` here: callers should be able to
// filter by `e2a:*` system labels even though they can't set
// them. Read access to the namespace is fine; write access
// remains restricted.
seen := map[string]struct{}{}
labelsFilter = make([]string, 0, len(rawLabels))
for _, raw := range rawLabels {
l, err := normalizeAndValidateLabel(raw, true)
if err != nil {
http.Error(w, "labels filter: "+err.Error(), http.StatusBadRequest)
return
}
if _, dup := seen[l]; dup {
continue
}
seen[l] = struct{}{}
labelsFilter = append(labelsFilter, l)
}
}
var since, until time.Time
if s := strings.TrimSpace(r.URL.Query().Get("since")); s != "" {
t, err := time.Parse(time.RFC3339, s)
Expand Down Expand Up @@ -2387,6 +2419,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
ConversationID string `json:"cv,omitempty"`
Since string `json:"sn,omitempty"`
Until string `json:"un,omitempty"`
Labels []string `json:"lb,omitempty"`
}
if err := json.Unmarshal(decoded, &cursor); err != nil {
http.Error(w, "invalid pagination token", http.StatusBadRequest)
Expand Down Expand Up @@ -2442,7 +2475,8 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
cursor.SubjectContains != subjectContains ||
cursor.ConversationID != conversationIDFilter ||
cursor.Since != sinceStr ||
cursor.Until != untilStr {
cursor.Until != untilStr ||
!stringSlicesEqual(cursor.Labels, labelsFilter) {
http.Error(w, "token was created with different search filters — start a new query without a token", http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -2478,6 +2512,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
ConversationID: conversationIDFilter,
Since: since,
Until: until,
Labels: labelsFilter,
})
if err != nil {
http.Error(w, "failed to fetch messages", http.StatusInternalServerError)
Expand Down Expand Up @@ -2509,6 +2544,10 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
WebhookStatus string `json:"webhook_status,omitempty"`
WebhookError string `json:"webhook_error,omitempty"`
SizeBytes int `json:"size_bytes,omitempty"`
// Labels is always present (never nil) so clients can render
// it without a null-check. Empty slice for unlabelled rows —
// matches the DB DEFAULT '{}' invariant.
Labels []string `json:"labels"`
CreatedAt string `json:"created_at"`
}

Expand Down Expand Up @@ -2541,6 +2580,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
WebhookStatus: wh,
WebhookError: whErr,
SizeBytes: size,
Labels: orEmptySlice(m.Labels),
CreatedAt: m.CreatedAt.UTC().Format(time.RFC3339),
}
}
Expand Down Expand Up @@ -2569,6 +2609,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
ConversationID string `json:"cv,omitempty"`
Since string `json:"sn,omitempty"`
Until string `json:"un,omitempty"`
Labels []string `json:"lb,omitempty"`
}{
CreatedAt: last.CreatedAt,
ID: last.ID,
Expand All @@ -2581,6 +2622,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) {
ConversationID: conversationIDFilter,
Since: sinceStr,
Until: untilStr,
Labels: labelsFilter,
})
if err != nil {
http.Error(w, "failed to build pagination token", http.StatusInternalServerError)
Expand Down Expand Up @@ -2653,12 +2695,123 @@ func (a *API) handleGetMessage(w http.ResponseWriter, r *http.Request) {
"subject": msg.Subject,
"conversation_id": msg.ConversationID,
"status": msg.DeliveryStatus,
"labels": orEmptySlice(msg.Labels),
"created_at": msg.CreatedAt.UTC().Format(time.RFC3339),
"auth_headers": msg.AuthHeaders,
"raw_message": msg.RawMessage,
})
}

// stringSlicesEqual reports whether two ordered string slices have
// identical contents. Used by the list endpoint's cursor stability
// check to compare a token's encoded label filter to the request's
// current ?labels= values. Nil and empty are treated as equal so an
// older token without the Labels field still matches a request with
// no labels= param.
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// updateMessageRequest is the JSON body for
// PATCH /api/v1/agents/{email}/messages/{id}. The only supported
// mutation today is the labels delta — passing both lists empty is a
// no-op that still returns the current label set, so callers can use
// the endpoint as a read for the labels alone if convenient.
type updateMessageRequest struct {
AddLabels []string `json:"add_labels,omitempty"`
RemoveLabels []string `json:"remove_labels,omitempty"`
}

// handleUpdateMessage modifies a message in-place. Currently scoped to
// the labels delta — `add_labels` / `remove_labels` arrays — but the
// PATCH path is shaped so future fields (read/unread toggle, archive
// flag) can be added without a new endpoint.
// @Summary Update a message (labels)
// @Description Apply a delta to a message's labels. Pass `add_labels` and/or `remove_labels` — each capped at 50 entries per request. Labels are lowercase strings drawn from `[a-z0-9:_-]+` up to 64 chars; the `e2a:` prefix is reserved for server-applied system labels and rejected on user writes. The total label count per message is capped at 100. A label appearing in both add and remove is removed (remove wins). Returns the post-update label set so callers can echo state without a fetch.
// @Tags Email
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param email path string true "Agent email address" example(my-bot@example.com)
// @Param id path string true "Message ID" example(msg_abc123)
// @Param request body UpdateMessageRequest true "Label delta"
// @Success 200 {object} UpdateMessageResponse
// @Failure 400 {string} string "Invalid label or per-message cap exceeded"
// @Failure 401 {string} string "Missing or invalid API key"
// @Failure 404 {string} string "Message not found or does not belong to this agent"
// @Router /api/v1/agents/{email}/messages/{id} [patch]
func (a *API) handleUpdateMessage(w http.ResponseWriter, r *http.Request) {
user, err := a.authenticateUser(r)
if err != nil {
a.writeAuthError(w, r, err)
return
}

if ok, retryAfter := a.pollLimit.AllowWithRetryAfter(user.ID); !ok {
writeTooManyRequests(w, retryAfter, "rate limit exceeded — max 60 requests per minute per user")
return
}

vars := mux.Vars(r)
email := normalizeEmail(vars["email"])
msgID := vars["id"]

agent, err := a.resolveAgentForUser(r, email, user)
if err != nil {
http.Error(w, "agent not found", http.StatusNotFound)
return
}

// 64 KB cap is plenty for a labels delta — the per-op cap of 50
// labels × 64 chars + JSON overhead is well under a KB. Without
// this cap a client could ship multi-MB bodies and the per-op
// validation only kicks in after full JSON allocation.
var req updateMessageRequest
if err := readJSON(w, r, &req, maxRequestBytesSmall); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}

addLabels, err := normalizeAndValidateLabelList(req.AddLabels, "add")
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
removeLabels, err := normalizeAndValidateLabelList(req.RemoveLabels, "remove")
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

final, err := a.store.ModifyMessageLabels(r.Context(), msgID, agent.ID, addLabels, removeLabels)
if err != nil {
switch {
case errors.Is(err, identity.ErrMessageNotFound):
http.Error(w, "message not found", http.StatusNotFound)
case errors.Is(err, identity.ErrLabelLimitExceeded):
http.Error(w, fmt.Sprintf("label limit exceeded — a message may carry at most %d labels", identity.MaxLabelsPerMessage), http.StatusBadRequest)
default:
log.Printf("[api] ModifyMessageLabels error: agent=%s msg=%s err=%v", agent.ID, msgID, err)
http.Error(w, "failed to update labels", http.StatusInternalServerError)
}
return
}

w.Header().Set("Content-Type", "application/json")
writeJSON(w, map[string]interface{}{
"message_id": msgID,
"labels": orEmptySlice(final),
})
}

// orEmptySlice returns s if non-nil, otherwise an empty []string. We marshal
// the To: list as an always-present array (no omitempty) so SDK clients can
// rely on it being present, even for messages stored before the column was
Expand Down Expand Up @@ -2831,6 +2984,84 @@ func validateConversationID(id string) error {
return nil
}

// Label validation constants. Public so tests can reference them.
const (
// MaxLabelLength bounds a single label's length to keep the GIN
// index entries small and prevent multi-KB tags from being smuggled
// into the array.
MaxLabelLength = 64

// MaxLabelsPerOp caps the per-request add/remove list size.
// Modeled on Gmail's 100/100 cap. Bigger than AgentMail's
// (unspecified) but defensive enough that one PATCH can't try
// to set thousands of labels.
MaxLabelsPerOp = 50

// LabelSystemPrefix marks server-applied system labels. User
// writes that try to set a label starting with this prefix are
// rejected with 400 — the namespace is reserved so future system
// labels (auto-tagged, hitl-approved, …) don't collide with user
// tags.
LabelSystemPrefix = "e2a:"
)

// normalizeAndValidateLabel canonicalizes a single label and rejects it
// if it would violate the labels invariants:
// - lowercase
// - charset `[a-z0-9:_-]+` (colon allowed for namespacing, but only
// the server may set `e2a:*`)
// - 1..MaxLabelLength chars after trimming
//
// Returns the normalized form on success. Lowercasing is the only
// transformation — colons / dashes / underscores stay as-is so a label
// from a query param is byte-identical to the same label set via PATCH.
func normalizeAndValidateLabel(raw string, allowSystemPrefix bool) (string, error) {
l := strings.ToLower(strings.TrimSpace(raw))
if l == "" {
return "", errors.New("label must not be empty")
}
if len(l) > MaxLabelLength {
return "", fmt.Errorf("label too long (max %d chars)", MaxLabelLength)
}
for _, r := range l {
switch {
case r >= 'a' && r <= 'z':
case r >= '0' && r <= '9':
case r == '-' || r == '_' || r == ':':
default:
return "", fmt.Errorf("label %q has invalid character; allowed: a-z 0-9 : - _", l)
}
}
if !allowSystemPrefix && strings.HasPrefix(l, LabelSystemPrefix) {
return "", fmt.Errorf("labels starting with %q are reserved for system use", LabelSystemPrefix)
}
return l, nil
}

// normalizeAndValidateLabelList runs each entry through
// normalizeAndValidateLabel, dedups within the slice, and rejects if
// the slice is empty after trimming or exceeds the per-op cap. The
// `op` argument is used only to shape the error message.
func normalizeAndValidateLabelList(raw []string, op string) ([]string, error) {
if len(raw) > MaxLabelsPerOp {
return nil, fmt.Errorf("%s_labels exceeds per-request cap of %d", op, MaxLabelsPerOp)
}
seen := map[string]struct{}{}
out := make([]string, 0, len(raw))
for _, r := range raw {
l, err := normalizeAndValidateLabel(r, false)
if err != nil {
return nil, err
}
if _, dup := seen[l]; dup {
continue
}
seen[l] = struct{}{}
out = append(out, l)
}
return out, nil
}

// validateRecipients ensures every entry in the joined To/CC/BCC slices
// is a syntactically valid RFC 5322 address. We use net/mail.ParseAddress
// rather than a custom regex because it handles bare local@domain,
Expand Down
Loading
Loading