From 16442c19b54d3d13c39b67a9550fa2bb42ca8519 Mon Sep 17 00:00:00 2001 From: jiashuoz Date: Wed, 27 May 2026 23:31:12 -0700 Subject: [PATCH 1/3] =?UTF-8?q?feat(api):=20message=20labels=20=E2=80=94?= =?UTF-8?q?=20PATCH=20+=20=3Flabels=20filter=20+=20read=20surface=20(Go)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice A of the labels feature: backend end-to-end. Strings, not IDs; delta semantics; reserved e2a:* prefix for server-applied system labels. Adds: - migration 020: labels TEXT[] NOT NULL DEFAULT '{}' on messages, plus a GIN index. ALTER ADD COLUMN with a constant default is metadata-only on Postgres 11+, safe on the large prod messages table. - identity.ModifyMessageLabels(msgID, agentID, add, remove) — single atomic UPDATE with set semantics (union ∪ add) \ remove. Pre-checks the post-add count against MaxLabelsPerMessage (100) and returns a typed ErrLabelLimitExceeded; cross-agent / missing rows return ErrMessageNotFound. - PATCH /api/v1/agents/{email}/messages/{id} accepts { add_labels, remove_labels }. Returns the post-update label set so callers can echo state without a fetch. - GET /api/v1/agents/{email}/messages?labels=urgent&labels=follow-up AND-matches via @>. Cursor encodes the filter so a token issued with one label set can't be replayed against a different one. - MessageSummary and MessageDetail responses now include labels[], always present (never null) — empty array for unlabelled rows. - Validation: lowercase, [a-z0-9:_-]+, ≤64 chars, ≤50 per op, ≤100 per message, e2a:* reserved for system writes (filter still permits reading them). Tests (17): - internal/identity/labels_test.go (7): round-trip, remove + overlap semantics, cap rejection, NotFound, cross-agent isolation, AND-match filter, never-null contract on freshly-created rows. - internal/agent/labels_api_test.go (10): happy add/remove with lowercasing, system-prefix rejection, charset rejection (space, slash, unicode, empty), over-length rejection, per-op cap rejection, 404 on missing, 404 on cross-agent, 401, list filter AND match + always-non-null labels in response shape, list filter invalid char. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/agent/api.go | 229 +++++++++++++++++++++++- internal/agent/api_docs.go | 27 +++ internal/agent/labels_api_test.go | 288 ++++++++++++++++++++++++++++++ internal/identity/labels_test.go | 234 ++++++++++++++++++++++++ internal/identity/store.go | 116 +++++++++++- migrations/020_message_labels.sql | 22 +++ 6 files changed, 911 insertions(+), 5 deletions(-) create mode 100644 internal/agent/labels_api_test.go create mode 100644 internal/identity/labels_test.go create mode 100644 migrations/020_message_labels.sql diff --git a/internal/agent/api.go b/internal/agent/api.go index 468db61..20b117b 100644 --- a/internal/agent/api.go +++ b/internal/agent/api.go @@ -283,6 +283,7 @@ func (a *API) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/v1/send", a.handleSendEmail).Methods("POST") r.HandleFunc("/api/v1/agents/{email}/messages", a.handleGetMessages).Methods("GET") r.HandleFunc("/api/v1/agents/{email}/messages/{id}", a.handleGetMessage).Methods("GET") + r.HandleFunc("/api/v1/agents/{email}/messages/{id}", a.handleUpdateMessage).Methods("PATCH") r.HandleFunc("/api/v1/agents/{email}/messages/{id}/reply", a.handleReplyToMessage).Methods("POST") r.HandleFunc("/api/v1/agents/{email}/messages/{id}/forward", a.handleForwardMessage).Methods("POST") r.HandleFunc("/api/v1/domains", a.handleListDomains).Methods("GET") @@ -2343,6 +2344,37 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { http.Error(w, "conversation_id too long", http.StatusBadRequest) return } + // labels filter accepts repeated `?labels=foo&labels=bar` query + // params and AND-matches them server-side. Each value is validated + // against the same charset as writes so the GIN-indexed lookup + // never sees a malformed value. The cap on the slice prevents a + // 1000-label query from melting the index. + rawLabels := r.URL.Query()["labels"] + if len(rawLabels) > MaxLabelsPerOp { + http.Error(w, fmt.Sprintf("labels filter exceeds cap of %d", MaxLabelsPerOp), http.StatusBadRequest) + return + } + var labelsFilter []string + if len(rawLabels) > 0 { + // `allowSystemPrefix=true` here: callers should be able to + // filter by `e2a:*` system labels even though they can't set + // them. Read access to the namespace is fine; write access + // remains restricted. + seen := map[string]struct{}{} + labelsFilter = make([]string, 0, len(rawLabels)) + for _, raw := range rawLabels { + l, err := normalizeAndValidateLabel(raw, true) + if err != nil { + http.Error(w, "labels filter: "+err.Error(), http.StatusBadRequest) + return + } + if _, dup := seen[l]; dup { + continue + } + seen[l] = struct{}{} + labelsFilter = append(labelsFilter, l) + } + } var since, until time.Time if s := strings.TrimSpace(r.URL.Query().Get("since")); s != "" { t, err := time.Parse(time.RFC3339, s) @@ -2387,6 +2419,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { ConversationID string `json:"cv,omitempty"` Since string `json:"sn,omitempty"` Until string `json:"un,omitempty"` + Labels []string `json:"lb,omitempty"` } if err := json.Unmarshal(decoded, &cursor); err != nil { http.Error(w, "invalid pagination token", http.StatusBadRequest) @@ -2442,7 +2475,8 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { cursor.SubjectContains != subjectContains || cursor.ConversationID != conversationIDFilter || cursor.Since != sinceStr || - cursor.Until != untilStr { + cursor.Until != untilStr || + !stringSlicesEqual(cursor.Labels, labelsFilter) { http.Error(w, "token was created with different search filters — start a new query without a token", http.StatusBadRequest) return } @@ -2478,6 +2512,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { ConversationID: conversationIDFilter, Since: since, Until: until, + Labels: labelsFilter, }) if err != nil { http.Error(w, "failed to fetch messages", http.StatusInternalServerError) @@ -2509,6 +2544,10 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { WebhookStatus string `json:"webhook_status,omitempty"` WebhookError string `json:"webhook_error,omitempty"` SizeBytes int `json:"size_bytes,omitempty"` + // Labels is always present (never nil) so clients can render + // it without a null-check. Empty slice for unlabelled rows — + // matches the DB DEFAULT '{}' invariant. + Labels []string `json:"labels"` CreatedAt string `json:"created_at"` } @@ -2541,6 +2580,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { WebhookStatus: wh, WebhookError: whErr, SizeBytes: size, + Labels: orEmptySlice(m.Labels), CreatedAt: m.CreatedAt.UTC().Format(time.RFC3339), } } @@ -2569,6 +2609,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { ConversationID string `json:"cv,omitempty"` Since string `json:"sn,omitempty"` Until string `json:"un,omitempty"` + Labels []string `json:"lb,omitempty"` }{ CreatedAt: last.CreatedAt, ID: last.ID, @@ -2581,6 +2622,7 @@ func (a *API) handleGetMessages(w http.ResponseWriter, r *http.Request) { ConversationID: conversationIDFilter, Since: sinceStr, Until: untilStr, + Labels: labelsFilter, }) if err != nil { http.Error(w, "failed to build pagination token", http.StatusInternalServerError) @@ -2653,12 +2695,119 @@ func (a *API) handleGetMessage(w http.ResponseWriter, r *http.Request) { "subject": msg.Subject, "conversation_id": msg.ConversationID, "status": msg.DeliveryStatus, + "labels": orEmptySlice(msg.Labels), "created_at": msg.CreatedAt.UTC().Format(time.RFC3339), "auth_headers": msg.AuthHeaders, "raw_message": msg.RawMessage, }) } +// stringSlicesEqual reports whether two ordered string slices have +// identical contents. Used by the list endpoint's cursor stability +// check to compare a token's encoded label filter to the request's +// current ?labels= values. Nil and empty are treated as equal so an +// older token without the Labels field still matches a request with +// no labels= param. +func stringSlicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// updateMessageRequest is the JSON body for +// PATCH /api/v1/agents/{email}/messages/{id}. The only supported +// mutation today is the labels delta — passing both lists empty is a +// no-op that still returns the current label set, so callers can use +// the endpoint as a read for the labels alone if convenient. +type updateMessageRequest struct { + AddLabels []string `json:"add_labels,omitempty"` + RemoveLabels []string `json:"remove_labels,omitempty"` +} + +// handleUpdateMessage modifies a message in-place. Currently scoped to +// the labels delta — `add_labels` / `remove_labels` arrays — but the +// PATCH path is shaped so future fields (read/unread toggle, archive +// flag) can be added without a new endpoint. +// @Summary Update a message (labels) +// @Description Apply a delta to a message's labels. Pass `add_labels` and/or `remove_labels` — each capped at 50 entries per request. Labels are lowercase strings drawn from `[a-z0-9:_-]+` up to 64 chars; the `e2a:` prefix is reserved for server-applied system labels and rejected on user writes. The total label count per message is capped at 100. A label appearing in both add and remove is removed (remove wins). Returns the post-update label set so callers can echo state without a fetch. +// @Tags Email +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param email path string true "Agent email address" example(my-bot@example.com) +// @Param id path string true "Message ID" example(msg_abc123) +// @Param request body UpdateMessageRequest true "Label delta" +// @Success 200 {object} UpdateMessageResponse +// @Failure 400 {string} string "Invalid label or per-message cap exceeded" +// @Failure 401 {string} string "Missing or invalid API key" +// @Failure 404 {string} string "Message not found or does not belong to this agent" +// @Router /api/v1/agents/{email}/messages/{id} [patch] +func (a *API) handleUpdateMessage(w http.ResponseWriter, r *http.Request) { + user, err := a.authenticateUser(r) + if err != nil { + a.writeAuthError(w, r, err) + return + } + + if ok, retryAfter := a.pollLimit.AllowWithRetryAfter(user.ID); !ok { + writeTooManyRequests(w, retryAfter, "rate limit exceeded — max 60 requests per minute per user") + return + } + + vars := mux.Vars(r) + email := normalizeEmail(vars["email"]) + msgID := vars["id"] + + agent, err := a.resolveAgentForUser(r, email, user) + if err != nil { + http.Error(w, "agent not found", http.StatusNotFound) + return + } + + var req updateMessageRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + addLabels, err := normalizeAndValidateLabelList(req.AddLabels, "add") + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + removeLabels, err := normalizeAndValidateLabelList(req.RemoveLabels, "remove") + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + final, err := a.store.ModifyMessageLabels(r.Context(), msgID, agent.ID, addLabels, removeLabels) + if err != nil { + switch { + case errors.Is(err, identity.ErrMessageNotFound): + http.Error(w, "message not found", http.StatusNotFound) + case errors.Is(err, identity.ErrLabelLimitExceeded): + http.Error(w, fmt.Sprintf("label limit exceeded — a message may carry at most %d labels", identity.MaxLabelsPerMessage), http.StatusBadRequest) + default: + log.Printf("[api] ModifyMessageLabels error: agent=%s msg=%s err=%v", agent.ID, msgID, err) + http.Error(w, "failed to update labels", http.StatusInternalServerError) + } + return + } + + w.Header().Set("Content-Type", "application/json") + writeJSON(w, map[string]interface{}{ + "message_id": msgID, + "labels": orEmptySlice(final), + }) +} + // orEmptySlice returns s if non-nil, otherwise an empty []string. We marshal // the To: list as an always-present array (no omitempty) so SDK clients can // rely on it being present, even for messages stored before the column was @@ -2831,6 +2980,84 @@ func validateConversationID(id string) error { return nil } +// Label validation constants. Public so tests can reference them. +const ( + // MaxLabelLength bounds a single label's length to keep the GIN + // index entries small and prevent multi-KB tags from being smuggled + // into the array. + MaxLabelLength = 64 + + // MaxLabelsPerOp caps the per-request add/remove list size. + // Modeled on Gmail's 100/100 cap. Bigger than AgentMail's + // (unspecified) but defensive enough that one PATCH can't try + // to set thousands of labels. + MaxLabelsPerOp = 50 + + // LabelSystemPrefix marks server-applied system labels. User + // writes that try to set a label starting with this prefix are + // rejected with 400 — the namespace is reserved so future system + // labels (auto-tagged, hitl-approved, …) don't collide with user + // tags. + LabelSystemPrefix = "e2a:" +) + +// normalizeAndValidateLabel canonicalizes a single label and rejects it +// if it would violate the labels invariants: +// - lowercase +// - charset `[a-z0-9:_-]+` (colon allowed for namespacing, but only +// the server may set `e2a:*`) +// - 1..MaxLabelLength chars after trimming +// +// Returns the normalized form on success. Lowercasing is the only +// transformation — colons / dashes / underscores stay as-is so a label +// from a query param is byte-identical to the same label set via PATCH. +func normalizeAndValidateLabel(raw string, allowSystemPrefix bool) (string, error) { + l := strings.ToLower(strings.TrimSpace(raw)) + if l == "" { + return "", errors.New("label must not be empty") + } + if len(l) > MaxLabelLength { + return "", fmt.Errorf("label too long (max %d chars)", MaxLabelLength) + } + for _, r := range l { + switch { + case r >= 'a' && r <= 'z': + case r >= '0' && r <= '9': + case r == '-' || r == '_' || r == ':': + default: + return "", fmt.Errorf("label %q has invalid character; allowed: a-z 0-9 : - _", l) + } + } + if !allowSystemPrefix && strings.HasPrefix(l, LabelSystemPrefix) { + return "", fmt.Errorf("labels starting with %q are reserved for system use", LabelSystemPrefix) + } + return l, nil +} + +// normalizeAndValidateLabelList runs each entry through +// normalizeAndValidateLabel, dedups within the slice, and rejects if +// the slice is empty after trimming or exceeds the per-op cap. The +// `op` argument is used only to shape the error message. +func normalizeAndValidateLabelList(raw []string, op string) ([]string, error) { + if len(raw) > MaxLabelsPerOp { + return nil, fmt.Errorf("%s_labels exceeds per-request cap of %d", op, MaxLabelsPerOp) + } + seen := map[string]struct{}{} + out := make([]string, 0, len(raw)) + for _, r := range raw { + l, err := normalizeAndValidateLabel(r, false) + if err != nil { + return nil, err + } + if _, dup := seen[l]; dup { + continue + } + seen[l] = struct{}{} + out = append(out, l) + } + return out, nil +} + // validateRecipients ensures every entry in the joined To/CC/BCC slices // is a syntactically valid RFC 5322 address. We use net/mail.ParseAddress // rather than a custom regex because it handles bare local@domain, diff --git a/internal/agent/api_docs.go b/internal/agent/api_docs.go index c64e8ed..0e2ff8b 100644 --- a/internal/agent/api_docs.go +++ b/internal/agent/api_docs.go @@ -64,6 +64,24 @@ type ReplyToMessageRequest struct { Attachments []Attachment `json:"attachments,omitempty"` } // @name ReplyToMessageRequest +// UpdateMessageRequest is the request body for PATCH /api/v1/agents/{email}/messages/{id}. +// Currently the only supported mutation is the labels delta — passing +// an empty body is a no-op. Both add_labels and remove_labels may be +// set in one request; on overlap the remove wins (the union is applied +// first, then the difference, mirroring Gmail's semantics). +type UpdateMessageRequest struct { + AddLabels []string `json:"add_labels,omitempty" example:"urgent"` + RemoveLabels []string `json:"remove_labels,omitempty" example:"unread"` +} // @name UpdateMessageRequest + +// UpdateMessageResponse is the response shape for label mutations. +// Returns the post-update label set so callers can echo state without +// a separate fetch. +type UpdateMessageResponse struct { + MessageID string `json:"message_id" example:"msg_abc123"` + Labels []string `json:"labels"` +} // @name UpdateMessageResponse + // ForwardMessageRequest is the request body for forwarding a message. // Body and html_body are the caller's optional comment to prepend; the // server appends a quoted block with the original headers and body. A @@ -126,6 +144,11 @@ type MessageSummary struct { WebhookStatus string `json:"webhook_status,omitempty" example:"delivered" enums:"pending,delivered,failed"` WebhookError string `json:"webhook_error,omitempty"` SizeBytes int `json:"size_bytes,omitempty" example:"4231"` + // Labels are caller-applied string tags. Always lowercase, charset + // `[a-z0-9:_-]+`, ≤ 64 chars each, ≤ 100 per message. The `e2a:` + // prefix is reserved for server-applied system labels. Empty array + // when no labels are set — never null. + Labels []string `json:"labels"` CreatedAt string `json:"created_at" example:"2025-01-15T10:30:00Z"` } // @name MessageSummary @@ -145,6 +168,10 @@ type MessageDetail struct { Subject string `json:"subject" example:"Hello"` ConversationID string `json:"conversation_id,omitempty"` Status string `json:"status" example:"read"` + // Labels are caller-applied string tags. See MessageSummary.Labels + // for the validation rules. Empty array when no labels are set — + // never null. + Labels []string `json:"labels"` CreatedAt string `json:"created_at"` AuthHeaders map[string]string `json:"auth_headers"` RawMessage string `json:"raw_message"` diff --git a/internal/agent/labels_api_test.go b/internal/agent/labels_api_test.go new file mode 100644 index 0000000..cb16e97 --- /dev/null +++ b/internal/agent/labels_api_test.go @@ -0,0 +1,288 @@ +package agent_test + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "sort" + "testing" + + "github.com/Mnexa-AI/e2a/internal/identity" +) + +// setupLabelsAgent provisions a verified domain + agent + an inbound +// message ready for label mutations. Returns the server, store, api +// key, agent email, and a single message id. +type labelsFixture struct { + server *http.Client + serverURL string + apiKey string + agentEmail string + msgID string +} + +func setupLabelsFixture(t *testing.T, prefix string) labelsFixture { + t.Helper() + server, store, _ := setupAPI(t) + ctx := context.Background() + user, _ := store.CreateOrGetUser(ctx, "owner-"+prefix+"@example.com", "Owner", "google-"+prefix) + apiKey, _ := store.CreateAPIKey(ctx, user.ID, prefix+"-key", nil) + domain := prefix + ".example.com" + store.ClaimOrCreateDomain(ctx, domain, user.ID) + store.VerifyDomain(ctx, domain, user.ID) + agentEmail := "bot@" + domain + agent, _ := store.CreateAgent(ctx, agentEmail, domain, "", "https://example.com/webhook", "", user.ID) + msg, _ := store.CreateInboundMessage(ctx, "", agent.ID, "alice@gmail.com", agentEmail, "", "Hi", "", "", nil, nil, nil, nil, nil) + return labelsFixture{ + server: http.DefaultClient, + serverURL: server.URL, + apiKey: apiKey.PlaintextKey, + agentEmail: agentEmail, + msgID: msg.ID, + } +} + +func patchLabels(t *testing.T, f labelsFixture, body string) *http.Response { + t.Helper() + req, _ := http.NewRequest("PATCH", f.serverURL+"/api/v1/agents/"+f.agentEmail+"/messages/"+f.msgID, bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+f.apiKey) + resp, err := f.server.Do(req) + if err != nil { + t.Fatalf("PATCH: %v", err) + } + return resp +} + +func TestUpdateMessageLabels_AddRemoveHappyPath(t *testing.T) { + f := setupLabelsFixture(t, "lbl-happy") + + resp := patchLabels(t, f, `{"add_labels":["Urgent","Follow-Up"],"remove_labels":["unread"]}`) + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + var body struct { + MessageID string `json:"message_id"` + Labels []string `json:"labels"` + } + json.NewDecoder(resp.Body).Decode(&body) + if body.MessageID != f.msgID { + t.Errorf("message_id = %q, want %q", body.MessageID, f.msgID) + } + // Labels are lowercased server-side — "Urgent" → "urgent". + want := []string{"follow-up", "urgent"} + sort.Strings(body.Labels) + if len(body.Labels) != 2 || body.Labels[0] != want[0] || body.Labels[1] != want[1] { + t.Errorf("labels = %v, want %v (lowercased + sorted)", body.Labels, want) + } + + // GET the message and confirm the labels are persisted. + getReq, _ := http.NewRequest("GET", f.serverURL+"/api/v1/agents/"+f.agentEmail+"/messages/"+f.msgID, nil) + getReq.Header.Set("Authorization", "Bearer "+f.apiKey) + getResp, _ := http.DefaultClient.Do(getReq) + defer getResp.Body.Close() + var detail map[string]interface{} + json.NewDecoder(getResp.Body).Decode(&detail) + labels, _ := detail["labels"].([]interface{}) + if len(labels) != 2 { + t.Errorf("detail labels = %v, want 2 entries", labels) + } +} + +func TestUpdateMessageLabels_RejectsSystemPrefix(t *testing.T) { + f := setupLabelsFixture(t, "lbl-sys") + resp := patchLabels(t, f, `{"add_labels":["e2a:auto-tagged"]}`) + defer resp.Body.Close() + if resp.StatusCode != 400 { + t.Errorf("status = %d, want 400 (system prefix is server-only)", resp.StatusCode) + } +} + +func TestUpdateMessageLabels_RejectsInvalidCharset(t *testing.T) { + f := setupLabelsFixture(t, "lbl-charset") + cases := []struct { + name string + body string + }{ + {"space", `{"add_labels":["hello world"]}`}, + {"slash", `{"add_labels":["foo/bar"]}`}, + {"unicode", `{"add_labels":["café"]}`}, + {"empty", `{"add_labels":[""]}`}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + resp := patchLabels(t, f, c.body) + defer resp.Body.Close() + if resp.StatusCode != 400 { + t.Errorf("status = %d, want 400 for %s", resp.StatusCode, c.name) + } + }) + } +} + +func TestUpdateMessageLabels_RejectsOverLengthLabel(t *testing.T) { + f := setupLabelsFixture(t, "lbl-toolong") + longLabel := "" + for i := 0; i < 65; i++ { + longLabel += "a" + } + resp := patchLabels(t, f, `{"add_labels":["`+longLabel+`"]}`) + defer resp.Body.Close() + if resp.StatusCode != 400 { + t.Errorf("status = %d, want 400 (label >64 chars)", resp.StatusCode) + } +} + +func TestUpdateMessageLabels_RejectsOverPerOpCap(t *testing.T) { + f := setupLabelsFixture(t, "lbl-opcap") + labels := make([]string, 51) // > MaxLabelsPerOp = 50 + for i := range labels { + labels[i] = "label-" + string(rune('a'+i%26)) + string(rune('a'+(i/26)%26)) + } + body, _ := json.Marshal(map[string][]string{"add_labels": labels}) + resp := patchLabels(t, f, string(body)) + defer resp.Body.Close() + if resp.StatusCode != 400 { + t.Errorf("status = %d, want 400 (over per-op cap)", resp.StatusCode) + } +} + +func TestUpdateMessageLabels_MessageNotFound(t *testing.T) { + f := setupLabelsFixture(t, "lbl-404") + req, _ := http.NewRequest("PATCH", f.serverURL+"/api/v1/agents/"+f.agentEmail+"/messages/msg_does_not_exist", bytes.NewBufferString(`{"add_labels":["x"]}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+f.apiKey) + resp, _ := http.DefaultClient.Do(req) + defer resp.Body.Close() + if resp.StatusCode != 404 { + t.Errorf("status = %d, want 404", resp.StatusCode) + } +} + +func TestUpdateMessageLabels_CrossAgentReturns404(t *testing.T) { + // Agent A creates a message; Agent B (different user) must NOT be + // able to mutate it. Cross-agent access looks like not-found to + // avoid leaking the existence of A's message via 403 vs 404. + server, store, _ := setupAPI(t) + ctx := context.Background() + + userA, _ := store.CreateOrGetUser(ctx, "owner-lblxA@example.com", "OwnerA", "google-lblxA") + store.ClaimOrCreateDomain(ctx, "lblxa.example.com", userA.ID) + store.VerifyDomain(ctx, "lblxa.example.com", userA.ID) + agentA, _ := store.CreateAgent(ctx, "bot@lblxa.example.com", "lblxa.example.com", "", "https://example.com/webhook", "", userA.ID) + msgA, _ := store.CreateInboundMessage(ctx, "", agentA.ID, "alice@gmail.com", "bot@lblxa.example.com", "", "Hi", "", "", nil, nil, nil, nil, nil) + + userB, _ := store.CreateOrGetUser(ctx, "owner-lblxB@example.com", "OwnerB", "google-lblxB") + apiKeyB, _ := store.CreateAPIKey(ctx, userB.ID, "lblxB-key", nil) + store.ClaimOrCreateDomain(ctx, "lblxb.example.com", userB.ID) + store.VerifyDomain(ctx, "lblxb.example.com", userB.ID) + store.CreateAgent(ctx, "bot@lblxb.example.com", "lblxb.example.com", "", "https://example.com/webhook", "", userB.ID) + + req, _ := http.NewRequest("PATCH", server.URL+"/api/v1/agents/bot@lblxb.example.com/messages/"+msgA.ID, bytes.NewBufferString(`{"add_labels":["x"]}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKeyB.PlaintextKey) + resp, _ := http.DefaultClient.Do(req) + defer resp.Body.Close() + if resp.StatusCode != 404 { + t.Errorf("status = %d, want 404 (cross-agent must look like not-found)", resp.StatusCode) + } +} + +func TestUpdateMessageLabels_Unauthorized(t *testing.T) { + server, _, _ := setupAPI(t) + req, _ := http.NewRequest("PATCH", server.URL+"/api/v1/agents/bot@example.com/messages/msg_any", bytes.NewBufferString(`{"add_labels":["x"]}`)) + req.Header.Set("Content-Type", "application/json") + resp, _ := http.DefaultClient.Do(req) + defer resp.Body.Close() + if resp.StatusCode != 401 { + t.Errorf("status = %d, want 401", resp.StatusCode) + } +} + +func TestListMessages_LabelsFilterANDMatch(t *testing.T) { + server, store, _ := setupAPI(t) + ctx := context.Background() + user, _ := store.CreateOrGetUser(ctx, "owner-lblfilter-api@example.com", "Owner", "google-lblfilter-api") + apiKey, _ := store.CreateAPIKey(ctx, user.ID, "lblfilter-api-key", nil) + store.ClaimOrCreateDomain(ctx, "lblfilter-api.example.com", user.ID) + store.VerifyDomain(ctx, "lblfilter-api.example.com", user.ID) + agentEmail := "bot@lblfilter-api.example.com" + agent, _ := store.CreateAgent(ctx, agentEmail, "lblfilter-api.example.com", "", "https://example.com/webhook", "", user.ID) + + m1, _ := store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", agentEmail, "", "M1-both", "", "", nil, nil, nil, nil, nil) + m2, _ := store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", agentEmail, "", "M2-urgent-only", "", "", nil, nil, nil, nil, nil) + store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", agentEmail, "", "M3-none", "", "", nil, nil, nil, nil, nil) + + store.ModifyMessageLabels(ctx, m1.ID, agent.ID, []string{"urgent", "follow-up"}, nil) + store.ModifyMessageLabels(ctx, m2.ID, agent.ID, []string{"urgent"}, nil) + + // Filter labels=urgent&labels=follow-up — only M1 has both. + req, _ := http.NewRequest("GET", server.URL+"/api/v1/agents/"+agentEmail+"/messages?status=all&direction=all&labels=urgent&labels=follow-up", nil) + req.Header.Set("Authorization", "Bearer "+apiKey.PlaintextKey) + resp, _ := http.DefaultClient.Do(req) + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + var listResp struct { + Messages []struct { + Subject string `json:"subject"` + Labels []string `json:"labels"` + } `json:"messages"` + } + json.NewDecoder(resp.Body).Decode(&listResp) + if len(listResp.Messages) != 1 { + t.Fatalf("filtered len = %d, want 1 (AND match)", len(listResp.Messages)) + } + if listResp.Messages[0].Subject != "M1-both" { + t.Errorf("filtered subject = %q, want M1-both", listResp.Messages[0].Subject) + } + + // Regression: every row in a *no-filter* list must include `labels` field + // (never null), even rows with no labels set. Contract for SDK consumers. + req2, _ := http.NewRequest("GET", server.URL+"/api/v1/agents/"+agentEmail+"/messages?status=all&direction=all", nil) + req2.Header.Set("Authorization", "Bearer "+apiKey.PlaintextKey) + resp2, _ := http.DefaultClient.Do(req2) + defer resp2.Body.Close() + raw, _ := decodeRaw(resp2) + // Each message row should have a "labels" key (possibly empty array) + // — never absent, never null. + if want := []byte(`"labels":`); !bytes.Contains(raw, want) { + t.Errorf("response missing \"labels\" key in list rows:\n%s", raw) + } + if bytes.Contains(raw, []byte(`"labels":null`)) { + t.Errorf("response contains \"labels\":null somewhere; must be [] for empty") + } +} + +func TestListMessages_LabelsFilterInvalidChar(t *testing.T) { + server, store, _ := setupAPI(t) + ctx := context.Background() + user, _ := store.CreateOrGetUser(ctx, "owner-lblfilter-invalid@example.com", "Owner", "google-lblfilter-invalid") + apiKey, _ := store.CreateAPIKey(ctx, user.ID, "lblfilter-invalid-key", nil) + store.ClaimOrCreateDomain(ctx, "lblfilter-invalid.example.com", user.ID) + store.VerifyDomain(ctx, "lblfilter-invalid.example.com", user.ID) + store.CreateAgent(ctx, "bot@lblfilter-invalid.example.com", "lblfilter-invalid.example.com", "", "https://example.com/webhook", "", user.ID) + + // Space is not in `[a-z0-9:_-]+`. + req, _ := http.NewRequest("GET", server.URL+"/api/v1/agents/bot@lblfilter-invalid.example.com/messages?labels=hello%20world", nil) + req.Header.Set("Authorization", "Bearer "+apiKey.PlaintextKey) + resp, _ := http.DefaultClient.Do(req) + defer resp.Body.Close() + if resp.StatusCode != 400 { + t.Errorf("status = %d, want 400", resp.StatusCode) + } +} + +func decodeRaw(resp *http.Response) ([]byte, error) { + var buf bytes.Buffer + _, err := buf.ReadFrom(resp.Body) + return buf.Bytes(), err +} + +// Silence the linter — identity is imported to give error types stable +// names for the test assertions even though they're only consumed by +// internal/agent storage calls. +var _ = identity.ErrMessageNotFound diff --git a/internal/identity/labels_test.go b/internal/identity/labels_test.go new file mode 100644 index 0000000..dc05380 --- /dev/null +++ b/internal/identity/labels_test.go @@ -0,0 +1,234 @@ +package identity_test + +import ( + "context" + "errors" + "fmt" + "reflect" + "sort" + "testing" + + "github.com/Mnexa-AI/e2a/internal/identity" + "github.com/Mnexa-AI/e2a/internal/testutil" +) + +// labelsTestSetup provisions a verified domain + agent + a single +// inbound message so each test can exercise label mutations against a +// real row. Returns the message id and agent id. +func labelsTestSetup(t *testing.T, store *identity.Store, prefix string) (msgID, agentID string) { + t.Helper() + ctx := context.Background() + user, err := store.CreateOrGetUser(ctx, "owner-"+prefix+"@example.com", "Owner", "google-"+prefix) + if err != nil { + t.Fatalf("CreateOrGetUser: %v", err) + } + domain := prefix + ".example.com" + if _, err := store.ClaimOrCreateDomain(ctx, domain, user.ID); err != nil { + t.Fatalf("ClaimOrCreateDomain: %v", err) + } + if err := store.VerifyDomain(ctx, domain, user.ID); err != nil { + t.Fatalf("VerifyDomain: %v", err) + } + agent, err := store.CreateAgent(ctx, "bot@"+domain, domain, "", "https://example.com/webhook", "", user.ID) + if err != nil { + t.Fatalf("CreateAgent: %v", err) + } + msg, err := store.CreateInboundMessage(ctx, "", agent.ID, "alice@gmail.com", "bot@"+domain, "", "Hello", "", "", nil, nil, nil, nil, nil) + if err != nil { + t.Fatalf("CreateInboundMessage: %v", err) + } + return msg.ID, agent.ID +} + +func TestModifyMessageLabels_AddRoundTrip(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + msgID, agentID := labelsTestSetup(t, store, "labels-add") + + final, err := store.ModifyMessageLabels(ctx, msgID, agentID, []string{"urgent", "follow-up"}, nil) + if err != nil { + t.Fatalf("ModifyMessageLabels: %v", err) + } + want := []string{"follow-up", "urgent"} // sorted ascending by store + if !reflect.DeepEqual(final, want) { + t.Errorf("returned labels = %v, want %v", final, want) + } + + // Re-read via the inbox query and confirm the labels round-trip. + msgs, err := store.GetMessagesByAgent(ctx, identity.MessageListFilter{AgentID: agentID, Status: "all", Direction: "all", Limit: 10}) + if err != nil { + t.Fatalf("GetMessagesByAgent: %v", err) + } + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want 1", len(msgs)) + } + got := append([]string(nil), msgs[0].Labels...) + sort.Strings(got) + if !reflect.DeepEqual(got, want) { + t.Errorf("persisted labels = %v, want %v", got, want) + } +} + +func TestModifyMessageLabels_RemoveAndOverlap(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + msgID, agentID := labelsTestSetup(t, store, "labels-remove") + + if _, err := store.ModifyMessageLabels(ctx, msgID, agentID, []string{"a", "b", "c"}, nil); err != nil { + t.Fatalf("seed: %v", err) + } + // "b" appears in both add and remove — remove wins (the union-then-difference + // semantics documented on ModifyMessageLabels). + final, err := store.ModifyMessageLabels(ctx, msgID, agentID, []string{"b", "d"}, []string{"b", "a"}) + if err != nil { + t.Fatalf("ModifyMessageLabels: %v", err) + } + want := []string{"c", "d"} + if !reflect.DeepEqual(final, want) { + t.Errorf("labels = %v, want %v", final, want) + } +} + +func TestModifyMessageLabels_RejectsOverCap(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + msgID, agentID := labelsTestSetup(t, store, "labels-cap") + + // Push one under the cap, then attempt to push two more in one op — + // the second pushes us over. + near := make([]string, identity.MaxLabelsPerMessage-1) + for i := range near { + near[i] = labelFor(i) + } + if _, err := store.ModifyMessageLabels(ctx, msgID, agentID, near, nil); err != nil { + t.Fatalf("seed near cap: %v", err) + } + // Now MaxLabelsPerMessage-1 are set. Adding 2 unique new labels = + // MaxLabelsPerMessage+1 — must reject. + _, err := store.ModifyMessageLabels(ctx, msgID, agentID, []string{labelFor(9000), labelFor(9001)}, nil) + if !errors.Is(err, identity.ErrLabelLimitExceeded) { + t.Errorf("err = %v, want ErrLabelLimitExceeded", err) + } + // One MORE add (taking us to exactly MaxLabelsPerMessage) MUST succeed — + // the boundary is the inclusive cap. + final, err := store.ModifyMessageLabels(ctx, msgID, agentID, []string{labelFor(9000)}, nil) + if err != nil { + t.Fatalf("at-cap add: %v", err) + } + if len(final) != identity.MaxLabelsPerMessage { + t.Errorf("len(final) = %d, want %d (exact cap)", len(final), identity.MaxLabelsPerMessage) + } +} + +func TestModifyMessageLabels_NotFound(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + _, agentID := labelsTestSetup(t, store, "labels-notfound") + + _, err := store.ModifyMessageLabels(ctx, "msg_nonexistent", agentID, []string{"x"}, nil) + if !errors.Is(err, identity.ErrMessageNotFound) { + t.Errorf("err = %v, want ErrMessageNotFound", err) + } +} + +func TestModifyMessageLabels_WrongAgent(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + msgID, _ := labelsTestSetup(t, store, "labels-wrong-a") + _, otherAgentID := labelsTestSetup(t, store, "labels-wrong-b") + + // agent B's id must NOT be able to mutate agent A's message. + _, err := store.ModifyMessageLabels(ctx, msgID, otherAgentID, []string{"x"}, nil) + if !errors.Is(err, identity.ErrMessageNotFound) { + t.Errorf("err = %v, want ErrMessageNotFound (cross-agent must look like not-found)", err) + } +} + +func TestGetMessagesByAgent_LabelsFilterANDMatch(t *testing.T) { + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + user, _ := store.CreateOrGetUser(ctx, "owner-lblfilter@example.com", "Owner", "google-lblfilter") + store.ClaimOrCreateDomain(ctx, "lblfilter.example.com", user.ID) + store.VerifyDomain(ctx, "lblfilter.example.com", user.ID) + agent, _ := store.CreateAgent(ctx, "bot@lblfilter.example.com", "lblfilter.example.com", "", "https://example.com/webhook", "", user.ID) + + // Three messages with overlapping label sets: + // m1: [urgent, follow-up] + // m2: [urgent] + // m3: [follow-up] + // Filter labels=[urgent, follow-up] must return ONLY m1 (AND semantics). + m1, _ := store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", "bot@lblfilter.example.com", "", "M1", "", "", nil, nil, nil, nil, nil) + m2, _ := store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", "bot@lblfilter.example.com", "", "M2", "", "", nil, nil, nil, nil, nil) + store.CreateInboundMessage(ctx, "", agent.ID, "a@gmail.com", "bot@lblfilter.example.com", "", "M3", "", "", nil, nil, nil, nil, nil) + + store.ModifyMessageLabels(ctx, m1.ID, agent.ID, []string{"urgent", "follow-up"}, nil) + store.ModifyMessageLabels(ctx, m2.ID, agent.ID, []string{"urgent"}, nil) + // m3 gets only follow-up + msgs, _ := store.GetMessagesByAgent(ctx, identity.MessageListFilter{AgentID: agent.ID, Status: "all", Direction: "all", Limit: 10}) + for _, m := range msgs { + if m.Subject == "M3" { + store.ModifyMessageLabels(ctx, m.ID, agent.ID, []string{"follow-up"}, nil) + } + } + + got, err := store.GetMessagesByAgent(ctx, identity.MessageListFilter{ + AgentID: agent.ID, + Status: "all", + Direction: "all", + Limit: 10, + Labels: []string{"urgent", "follow-up"}, + }) + if err != nil { + t.Fatalf("filter query: %v", err) + } + if len(got) != 1 { + t.Fatalf("filtered len = %d, want 1 (AND match should only return m1)", len(got)) + } + if got[0].Subject != "M1" { + t.Errorf("filtered subject = %q, want M1", got[0].Subject) + } +} + +func TestGetMessagesByAgent_LabelsAlwaysNonNil(t *testing.T) { + // Regression: a row with no labels set must come back as []string{} + // (or nil-coalesced to empty) — never a JSON null. The DB DEFAULT + // '{}' is the contract. Tests the COALESCE-equivalent at the + // pgx-driver layer. + pool := testutil.TestDB(t) + store := identity.NewStore(pool) + ctx := context.Background() + + _, agentID := labelsTestSetup(t, store, "labels-nilcheck") + + msgs, err := store.GetMessagesByAgent(ctx, identity.MessageListFilter{AgentID: agentID, Status: "all", Direction: "all", Limit: 10}) + if err != nil { + t.Fatalf("GetMessagesByAgent: %v", err) + } + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want 1", len(msgs)) + } + // Either nil or empty is acceptable at the DB layer; the API layer + // converts to empty for the wire. Just confirm it's not populated + // with a stray label. + if len(msgs[0].Labels) != 0 { + t.Errorf("labels on a freshly-created message = %v, want empty", msgs[0].Labels) + } +} + +func labelFor(i int) string { + // Stable, short, charset-valid labels for cap tests. Padding to + // 4 digits keeps the lexicographic sort predictable so a debugger + // printout reads sensibly. + return fmt.Sprintf("label-%04d", i) +} diff --git a/internal/identity/store.go b/internal/identity/store.go index 6c5516b..5a4a126 100644 --- a/internal/identity/store.go +++ b/internal/identity/store.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "log" + "sort" "strings" "time" @@ -190,6 +191,14 @@ type Message struct { // Reply-To points at a different mailbox. Outbound-irrelevant. ReplyTo []string `json:"reply_to,omitempty"` + // Labels are user-applied string tags (`urgent`, `follow-up`, …). + // Always lowercase, charset `[a-z0-9:_-]+`, ≤ 64 chars per label, + // capped at 100 per message. Empty slice means no labels — the DB + // default is `'{}'` so this is never null on read. Labels with the + // `e2a:` prefix are reserved for server-applied system labels; + // caller writes that try to set them are rejected at the API layer. + Labels []string `json:"labels,omitempty"` + // HITL approval fields. Status defaults to 'sent'; body and attachments // are populated only while a message is in 'pending_approval', and are // scrubbed on any terminal transition. @@ -1846,6 +1855,12 @@ type MessageListFilter struct { ConversationID string // exact match Since time.Time // created_at >= Since Until time.Time // created_at < Until + // Labels filters rows where ALL given labels are present on the + // message (AND-match via Postgres @> array containment). Empty slice + // means "no label constraint" — matches both labelled and unlabelled + // rows. Handler-layer validates each entry against the same charset + // rule used on writes so callers can't smuggle SQL through here. + Labels []string } // GetMessagesByAgent returns messages for an agent, filtered by status, @@ -1871,7 +1886,7 @@ func (s *Store) GetMessagesByAgent(ctx context.Context, f MessageListFilter) ([] var query string var args []interface{} - baseSelect := `SELECT m.id, m.agent_id, m.direction, m.sender, m.recipient, m.to_recipients, m.cc, m.reply_to, m.subject, m.email_message_id, m.conversation_id, COALESCE(m.inbox_status, ''), COALESCE(m.status, ''), COALESCE(wd.status, ''), COALESCE(wd.last_error, ''), COALESCE(octet_length(m.raw_message), 0), m.created_at + baseSelect := `SELECT m.id, m.agent_id, m.direction, m.sender, m.recipient, m.to_recipients, m.cc, m.reply_to, m.subject, m.email_message_id, m.conversation_id, COALESCE(m.inbox_status, ''), COALESCE(m.status, ''), COALESCE(wd.status, ''), COALESCE(wd.last_error, ''), COALESCE(octet_length(m.raw_message), 0), m.created_at, m.labels FROM messages m LEFT JOIN webhook_deliveries wd ON wd.message_id = m.id WHERE m.agent_id = $1 AND m.expires_at > now()` @@ -1937,6 +1952,15 @@ func (s *Store) GetMessagesByAgent(ctx context.Context, f MessageListFilter) ([] query += fmt.Sprintf(` AND m.created_at < $%d`, len(args)+1) args = append(args, f.Until) } + if len(f.Labels) > 0 { + // AND-match via @> array containment. The GIN index on labels + // makes this O(log n) for the typical case (≤ 5 filter labels, + // ≤ 100 labels per row). Empty caller-supplied labels are + // stripped at the handler layer so we never produce + // "labels @> ARRAY['']" which would match nothing. + query += fmt.Sprintf(` AND m.labels @> $%d`, len(args)+1) + args = append(args, f.Labels) + } cursorCmp := ">" sortDir := "ASC" @@ -1966,7 +1990,7 @@ func (s *Store) GetMessagesByAgent(ctx context.Context, f MessageListFilter) ([] &m.ID, &m.AgentID, &m.Direction, &m.Sender, &m.Recipient, &m.ToRecipients, &m.CC, &m.ReplyTo, &m.Subject, &m.EmailMessageID, &m.ConversationID, &m.InboxStatus, &m.Status, &m.WebhookStatus, &m.WebhookError, &m.SizeBytes, - &m.CreatedAt, + &m.CreatedAt, &m.Labels, ); err != nil { return nil, err } @@ -1986,9 +2010,9 @@ func (s *Store) GetMessageWithContent(ctx context.Context, messageID, agentID st err := s.pool.QueryRow(ctx, `UPDATE messages SET inbox_status = CASE WHEN inbox_status = 'unread' THEN 'read' ELSE inbox_status END WHERE id = $1 AND agent_id = $2 AND expires_at > now() - RETURNING id, agent_id, direction, sender, recipient, to_recipients, cc, reply_to, subject, email_message_id, conversation_id, COALESCE(inbox_status, ''), raw_message, auth_headers, created_at, expires_at`, + RETURNING id, agent_id, direction, sender, recipient, to_recipients, cc, reply_to, subject, email_message_id, conversation_id, COALESCE(inbox_status, ''), raw_message, auth_headers, created_at, expires_at, labels`, messageID, agentID, - ).Scan(&m.ID, &m.AgentID, &m.Direction, &m.Sender, &m.Recipient, &m.ToRecipients, &m.CC, &m.ReplyTo, &m.Subject, &m.EmailMessageID, &m.ConversationID, &m.DeliveryStatus, &m.RawMessage, &authHeadersJSON, &m.CreatedAt, &m.ExpiresAt) + ).Scan(&m.ID, &m.AgentID, &m.Direction, &m.Sender, &m.Recipient, &m.ToRecipients, &m.CC, &m.ReplyTo, &m.Subject, &m.EmailMessageID, &m.ConversationID, &m.DeliveryStatus, &m.RawMessage, &authHeadersJSON, &m.CreatedAt, &m.ExpiresAt, &m.Labels) if err != nil { return nil, err } @@ -2000,6 +2024,90 @@ func (s *Store) GetMessageWithContent(ctx context.Context, messageID, agentID st return m, nil } +// ErrLabelLimitExceeded reports that an add operation would push a +// message past MaxLabelsPerMessage. Mapped to HTTP 400 at the handler. +var ErrLabelLimitExceeded = errors.New("label limit exceeded") + +// MaxLabelsPerMessage is the post-add cap on the labels[] column. The +// per-operation cap (max items in add_labels / remove_labels) is +// enforced earlier at the handler. The two together bound the array +// at a size where GIN containment + JSON marshalling stay cheap. +const MaxLabelsPerMessage = 100 + +// ModifyMessageLabels applies a delta — add then remove — to a +// message's labels[] in a single atomic statement. Returns the updated +// labels (deduplicated, sorted) so the caller can echo them back in +// the response without a second round-trip. +// +// Inputs are assumed already normalized (lowercased, charset-validated, +// dedup'd within each list, e2a:* gated). The store layer: +// - applies adds first, then removes (so a label in both lists ends up removed) +// - rejects if the post-add total would exceed MaxLabelsPerMessage +// - returns ErrMessageNotFound if the row is missing / expired / cross-agent +// +// The whole thing runs as one UPDATE so a concurrent PATCH from a +// second client can't observe a partial state. +func (s *Store) ModifyMessageLabels(ctx context.Context, messageID, agentID string, add, remove []string) ([]string, error) { + // Pre-check the post-add length against the cap. Done as a + // dedicated SELECT-then-UPDATE so we can return a specific error + // rather than a generic constraint violation — the handler maps + // ErrLabelLimitExceeded to 400 with a useful message. + tx, err := s.pool.Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + var current []string + err = tx.QueryRow(ctx, + `SELECT labels FROM messages WHERE id = $1 AND agent_id = $2 AND expires_at > now() FOR UPDATE`, + messageID, agentID, + ).Scan(¤t) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, ErrMessageNotFound + } + return nil, err + } + + // Apply the delta in-memory so the cap check is exact. The set + // semantics here mirror what the SQL UPDATE below does: + // labels' = (labels ∪ add) \ remove. + labelSet := map[string]struct{}{} + for _, l := range current { + labelSet[l] = struct{}{} + } + for _, l := range add { + labelSet[l] = struct{}{} + } + for _, l := range remove { + delete(labelSet, l) + } + if len(labelSet) > MaxLabelsPerMessage { + return nil, ErrLabelLimitExceeded + } + + final := make([]string, 0, len(labelSet)) + for l := range labelSet { + final = append(final, l) + } + sort.Strings(final) + + if _, err := tx.Exec(ctx, + `UPDATE messages SET labels = $1 WHERE id = $2 AND agent_id = $3`, + final, messageID, agentID, + ); err != nil { + return nil, err + } + if err := tx.Commit(ctx); err != nil { + return nil, err + } + if final == nil { + final = []string{} + } + return final, nil +} + // UpdateMessageDeliveryStatus sets the inbox_status on a message. func (s *Store) UpdateMessageDeliveryStatus(ctx context.Context, messageID, agentID, status string) error { _, err := s.pool.Exec(ctx, diff --git a/migrations/020_message_labels.sql b/migrations/020_message_labels.sql new file mode 100644 index 0000000..5f14321 --- /dev/null +++ b/migrations/020_message_labels.sql @@ -0,0 +1,22 @@ +-- 020_message_labels.sql +-- +-- Add per-message string labels. labels[] is the canonical store — +-- bare strings, not a join table, no separate `labels` resource. +-- Matches the agent-shaped model used by AgentMail (every label is just +-- a tag an agent emits); contrasts with Gmail's label-as-resource model +-- which needs a CRUD surface for color/visibility metadata we don't +-- want. +-- +-- Default '{}'::text[] is a constant: Postgres 11+ records the new +-- column metadata-only and rewrites no rows. Safe on a multi-million- +-- row messages table. +-- +-- GIN index is the standard array containment index — supports the +-- AND-match filter shape `WHERE labels @> ARRAY['urgent','follow-up']` +-- exposed by GET /messages?labels=urgent&labels=follow-up. +-- +-- Idempotent: ADD COLUMN IF NOT EXISTS, CREATE INDEX IF NOT EXISTS. + +ALTER TABLE messages ADD COLUMN IF NOT EXISTS labels TEXT[] NOT NULL DEFAULT '{}'; + +CREATE INDEX IF NOT EXISTS idx_messages_labels_gin ON messages USING GIN (labels); From c313267a5c8d8d1aad8b6a2386c872cb9bec738b Mon Sep 17 00:00:00 2001 From: jiashuoz Date: Wed, 27 May 2026 23:35:05 -0700 Subject: [PATCH 2/3] feat(clients): expose labels on TS/Python SDKs, CLI, MCP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slice B of the labels feature: client surfaces. Wires the PATCH endpoint + ?labels filter through every consumer. - Generated: web/public/openapi.yaml, TS types, Python types (regen). - TS SDK: api.updateMessageLabels(email, msgId, body) — raw HTTP. client.updateMessageLabels(msgId, {addLabels, removeLabels, agentEmail}) — high-level. api.listMessages + client.listMessages accept `labels: string[]` and emit repeated ?labels= query params. - CLI: `e2a labels [--add