diff --git a/blueprints/idempotency/DESIGN.md b/blueprints/idempotency/DESIGN.md new file mode 100644 index 00000000..d81e2ba0 --- /dev/null +++ b/blueprints/idempotency/DESIGN.md @@ -0,0 +1,279 @@ +# Idempotency & dedup: design decision and prior art + +Status: internal design note (not for the README/docs; basis for the Slack +reply to Fabrizio and for the eventual feature PRs). Refs issue #293. + +This note records *why* pgque's idempotency feature has the shape it does. The +short version: pgque is a **log**, not a job queue, and that single fact +determines the entire design. + +--- + +## 1. TL;DR — the decision + +Fabrizio asked for two things, framed as one ("idempotency keys") plus one +("one job at a time per partition key"). After working it through, they are +**two separate features at two different layers**, and the split is forced by +the log model: + +1. **Producer idempotency = a TTL/window dedup, enforced at produce time.** + A duplicate `send` with the same key inside a time window is a no-op that + returns the original event id. Append-only, garbage-collected by the + existing table rotation. This is what SQS, NATS JetStream, and the RabbitMQ + dedup plugin all do — because they are logs/brokers too. + +2. **"Free once processed" (pg-boss `singletonKey`) = a consumer-side, + per-consumer key lease.** This is the "one in-flight per partition key" + feature. It lives on the read side because that is the *only* place where + "processed" is a well-defined fact. + +The thing that *cannot* exist: a producer-side "reject the duplicate until the +prior one is processed" in a log. Section 3 explains why, three independent +ways. Section 5 is the prior-art evidence. + +--- + +## 2. The model: pgque is a log, not a job queue + +PgQ (and therefore pgque) is an append-only event **log** with independent +consumer cursors: + +- Producers append events to the current data table. Events are **never** + updated or deleted on consumption — `finish_batch` only advances a + per-subscription tick cursor (`subscription.sub_last_tick`). Events physically + vanish only when table rotation `TRUNCATE`s their child table. +- A queue can have **many independent consumers** (fan-out). Each has its own + cursor. An event can be done for consumer A and still pending for consumer B. +- Rotation recycles the oldest of N child tables (default 3) every + `queue_rotation_period` (default 2h), and only once no consumer still needs + it. **Rotation is the only garbage collector.** + +A job queue (pg-boss, Oban, River, Graphile Worker) is the opposite: each job +is a **mutable row** consumed once by one logical worker pool, carrying a +`state` column that is `UPDATE`d (`created → active → completed`). "Processed" +is a global, singular property of the row. + +That difference is the whole story. + +--- + +## 3. Why "free once processed" cannot be a producer feature in a log + +Three independent arguments, all pointing the same way. + +### 3.1 The model argument + +"Processed" is a **per-consumer** fact. In a fan-out log the question "is key K +processed?" has no answer without naming a consumer — K can be processed by A +and pending for B simultaneously. A producer sits before the fan-out; it has no +single "processed" state to free a key against. The predicate is not just hard +to compute, it is **undefined** at the producer. + +### 3.2 The mechanics argument + +The engine *does* expose one aggregate signal a producer could read: +`min(sub_last_tick)` across all subscriptions (it is exactly what rotation uses +to decide when a table is safe to truncate). So one could, in principle, probe +"has every consumer's cursor passed K's event?" But: + +- **"Free once ALL consumers processed"** means the key stays reserved until the + *slowest* consumer drains it; a lagging or dead consumer **wedges the key** + indefinitely (bounded only by a TTL backstop). +- **"Free once ANY consumer processed"** breaks the guarantee for the laggards: + if B has not yet seen the first K and the producer re-sends K, B now has two + in-flight copies of K — the exact duplicate the feature was meant to prevent. + +Either way, **producer dedup behavior becomes a function of consumer lag.** That +is operationally surprising (a dead consumer silently changes whether your sends +deduplicate) and conceptually backwards for a log, whose entire value is that +producers and consumers are decoupled. + +### 3.3 The prior-art argument + +No system in the field does append-only "free once processed." Every system that +offers it is a job queue that pays for it with a per-row state `UPDATE`. Every +log that does business-key dedup uses a wall-clock TTL window instead. This is +not an oversight — it is structural: "free once processed" must *observe* +"processed," and "processed" is row state. See §5. + +--- + +## 4. Recommended designs + +### 4.1 Producer idempotency — TTL window dedup (variant 1) + +Contract: `send` with an idempotency key is deduplicated against other sends +with the same key **within a time window**. Freeing is by wall clock, not by +consumption — identical to SQS's "tracking continues even after the message has +been received and deleted." + +Why this is the right (and only coherent) producer-side option for a log: §3. + +Why it does **not** reproduce pg-boss's bloat — the point that matters most for +Fabrizio: the dedup state is sized by **`throughput × window`**, not by the +backlog. pg-boss bloats because its state grows with the *pending pile* (millions +of stuck jobs, each an indexed mutable row). A TTL dedup ledger is bounded by the +send rate times a short window, completely independent of how far behind the +consumers are. The failure mode he is fleeing does not exist here even in the +naive implementation. + +Shape (pseudocode-level; final SQL is a later PR): + +``` +-- non-rotated sidecar, or a rotation-partitioned sidecar (see GC fork below) +pgque.idempotency (queue, key, ev_id, expires_at) -- unique (queue, key) + +function pgque.send_idempotent(queue, key, payload, ttl): + insert into pgque.idempotency (queue, key, ev_id, expires_at) + values (queue, key, , now() + ttl) + on conflict (queue, key) do nothing + -- if inserted: produce the real event, record ev_id, return (ev_id, deduped=false) + -- if conflict and not expired: return (existing ev_id, deduped=true) + -- if conflict and expired: reclaim the row, produce, return (new ev_id, deduped=false) +``` + +**Return contract.** pg-boss returns `null` on a deduped send (the caller gets +nothing). The log brokers do better — SQS returns a fresh `MessageId`, NATS sets +`PubAck.duplicate = true`. pgque should **return the existing event id plus a +`deduplicated` boolean**: strictly more useful than pg-boss, and free since the +dedup row already stores the id. + +**The one open engineering fork — how the ledger is GC'd:** + +- **(X) Non-partitioned table, global `unique (queue, key)` + `expires_at`, + pruned by a `maint`-cycle DELETE reaper.** Exact, predictable window; dedup is + a single `on conflict`. Cost: per-row delete churn → autovacuum on a small hot + table. (This is the in-tree precedent — `delayed_events` + a `maint_*` step, + and the DLQ's unique-index-on-conflict pattern.) +- **(Y) Rotation-partitioned ledger (or the key carried in the event stream), + GC'd by `TRUNCATE`/`DROP` of old buckets — append-only, zero vacuum.** Cost: + Postgres requires the partition key inside any unique constraint, so + uniqueness is per-bucket → a key can recur across buckets → dedup needs a probe + across the live buckets (the "previous-child probe" / sawtooth window). + +Net: **vacuum-churn (X) vs probe-cost (Y).** X's churn is window-bounded and +modest here (not pg-boss's monster); Y is append-only but pays a small +multi-bucket read per send and has a ragged window at the rotation boundary. +This is the single decision to make before writing the producer PR. + +### 4.2 Free-once-processed — consumer-side per-key lease (the partition feature) + +This is where "free once processed" legitimately lives, because a single +consumer's in-flight set is that consumer's own concern, small, and well-defined. + +- Carry the partition/idempotency key on the event (`ev_extra1`, no schema + change) via a `send_partitioned(queue, key, payload)` wrapper. +- A per-consumer lease sidecar: when a consumer receives an event for key K, it + claims the lease (`insert ... on conflict do nothing`); a second event for K is + **deferred** (re-queued via `event_retry`) until the first is acked, at which + point the lease is released. Net effect: at most one in-flight job per key per + consumer. Add a lease TTL reaper so a crashed worker cannot wedge a key. +- Policy knob: **drop** the duplicate (idempotency flavor) vs **defer** it + (serialization flavor) — same machinery, two surfaces. + +Because the lease is per-consumer, the fan-out ambiguity of §3.2 disappears: +each consumer enforces its own "one in-flight per key" without reference to any +other consumer. + +--- + +## 5. Prior art (evidence for §3.3) + +Sorted by the log-vs-job-queue axis. All facts are from primary sources +(source DDL / official docs); URLs inline. + +### Logs / brokers that do business-key dedup → all variant-1 (TTL window), produce-side + +| System | Mechanism | Freeing | Notes | +|---|---|---|---| +| **AWS SQS FIFO** | server dedup-ID set per queue (or SHA-256 of body) | fixed **5-min window**, wall-clock | docs: "continues tracking the deduplication ID **even after the message has been received and deleted**" — explicitly *not* free-once-processed. Returns success + a fresh `MessageId`. | +| **NATS JetStream** | per-stream table keyed by `Nats-Msg-Id` | configurable `duplicate_window`, **default 2 min** | `PubAck.duplicate = true` on a suppressed write. | +| **RabbitMQ** (noxdafox plugin) | in-mem cache keyed by `x-deduplication-header`, bounded by `x-cache-size` | TTL `x-cache-ttl` | core RabbitMQ has **no** dedup. | +| **Kafka** idempotent producer | per-(PID, partition) sequence numbers | n/a | **not** business-key dedup — only same-producer-instance retry dedup; new PID on restart. | +| **GCP Pub/Sub** | server `message_id` redelivery suppression | per-message, no redeliver after ack | **not** producer-key dedup — two `publish()` of the same logical message are two messages. | + +Sources: SQS [using-messagededuplicationid](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html), +[FIFO exactly-once](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-exactly-once-processing.html); +NATS [model_deep_dive](https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive); +RabbitMQ plugin [README](https://github.com/noxdafox/rabbitmq-message-deduplication); +Kafka [KIP-98](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging); +Pub/Sub [exactly-once-delivery](https://cloud.google.com/pubsub/docs/exactly-once-delivery). + +### Job queues that do free-once-processed → all rely on a mutable per-row `state` column + +| System | Mechanism | Freeing | Per-row mutation? | +|---|---|---|---| +| **pg-boss** | partial unique indexes on `(name, COALESCE(singleton_key,''))` **predicated on the mutable `state` column** (`job_i1/i2/i3/i6`, e.g. `WHERE state <= 'active'`) | `UPDATE ... SET state='completed'` pushes the row out of the index predicate | **Yes** — and the index-on-mutable-`state` is the documented bloat source (HOT updates defeated; terminal rows linger under retention until `DELETE` + vacuum). Returns `null` on dedup. | +| **Oban** | `pg_try_advisory_xact_lock` + `SELECT` over `state ∈ states` within `period` (no DB constraint) | state leaves the watched set, or `period` (default 60s) elapses | **Yes**, in-place state UPDATE. Docs admit it is "prone to race conditions." | +| **River** (v0.12+) | partial unique index on `unique_key`, predicate over a per-row `unique_states BIT(8)` bitmask | row's `state` leaves the bitmask → drops out of the index, no cleanup job | **Yes** — elegant ("free on completion for free") but works *only because* `state` is an UPDATE'd column. | +| **Graphile Worker** | `UNIQUE (key)` on the job row, `INSERT ... ON CONFLICT (key) DO UPDATE` | job completes → **row DELETEd** | **Yes** (replace/upsert + delete-on-complete). | +| **Hatchet** | side `WorkflowRunDedupe` table, `UNIQUE (tenantId, workflowId, value)`, reject on conflict | run reaches terminal state → dedup row removed | side-table registry as a lock. | + +Sources: pg-boss [`src/plans.js`](https://cdn.jsdelivr.net/npm/pg-boss/src/plans.js) (partial unique indexes + `completeJobs`); +Oban [unique_jobs](https://hexdocs.pm/oban/unique_jobs.html) + `lib/oban/engines/basic.ex`; +River [unique-jobs](https://riverqueue.com/docs/unique-jobs) + migration `006_bulk_unique.up.sql`; +Graphile Worker [job-key](https://worker.graphile.org/docs/job-key); +Hatchet `WorkflowRunDedupe` migration (`20240726160629_v0_40_0.sql`). + +### The peer that has nothing + +**pgmq** — the closest architectural analog to pgque (simple single-extension +Postgres queue, `send`/`read`/`pop`/`archive`) — has **no dedup or idempotency +feature at all**. `send` always inserts; two identical sends yield two messages. +Source: [pgmq SQL functions](https://pgmq.github.io/pgmq/latest/api/sql/functions/). + +**Takeaway:** logs do TTL-window dedup; job queues do state-based +free-once-processed and eat the per-row UPDATE for it; nobody does append-only +free-once-processed. pgque's nearest analog ships neither — so both of pgque's +planned features are genuine differentiators, not catch-up. + +--- + +## 6. Open decisions (before writing PRs) + +1. **Producer GC fork: (X) vacuum-reaper vs (Y) rotation-partitioned probe** (§4.1). +2. **Default TTL** for the producer window, and its relation to rotation period. + Hard floor only matters for the consumer-lease variant; for pure window dedup + the TTL is just "how long do duplicate sends collapse." +3. **Return contract** confirmation: existing id + `deduplicated` flag (recommended + over pg-boss's `null`). +4. **Consumer lease**: drop-vs-defer policy surface; lease TTL reaper; whether the + lease key reuses `ev_extra1` or gets a dedicated column. +5. **Two PRs, in order**: producer window-dedup first (self-contained, closes the + spirit of #293), consumer lease second (the partition feature). + +--- + +## 7. Slack-reply-ready summary (for Fabrizio) + +> Great questions, and digging into them surfaced something important: pgque is +> a **log**, not a job queue (PgQ heritage — append-only events, independent +> consumer cursors, no per-row state). That changes how idempotency has to work. +> +> pg-boss's `singletonKey` ("dedupe until the job is processed") is implemented +> with a partial unique index on a mutable `state` column that gets `UPDATE`d to +> `completed` — and that index-on-mutable-state is *exactly* the write +> amplification / bloat you're migrating away from. We don't want to reintroduce +> it. +> +> In a log, "processed" is a per-consumer fact the producer can't see, so +> "free-once-processed" can't be a producer feature. So we'd split it: +> +> 1. **Producer idempotency** = a dedup **window** (like SQS's dedup ID or NATS's +> `Nats-Msg-Id` window) — a duplicate `send` with the same key inside the +> window is a no-op returning the original id. Append-only, GC'd by our table +> rotation, and crucially **sized by throughput × window, not by backlog** — +> so it can't bloat the way pg-boss does when consumers fall behind. +> 2. **"One in-flight per key" / free-once-processed** = a **consumer-side** key +> lease (this is also your partitions ask). It lives on the read side because +> that's the only place "processed" is defined, and it's per-consumer so +> fan-out stays clean. +> +> Both are things our closest analog (pgmq) doesn't have, so we're keen on the +> contributions. Happy to pair on the produce-side window dedup first — it's +> self-contained and closes the core of your idempotency issue. + +--- + +(Companion: the consumer-side partition feature in +`blueprints/partition-keys/SPEC.md`.) diff --git a/blueprints/partition-keys/SPEC.md b/blueprints/partition-keys/SPEC.md new file mode 100644 index 00000000..8b336c3e --- /dev/null +++ b/blueprints/partition-keys/SPEC.md @@ -0,0 +1,294 @@ +# PgQue Partition Keys — Spec + +- **Version:** v0.4 (draft) +- **Status:** review rounds 1–3 applied. **Phase 1 (`skip`-default partition + consumption) is converged / implementation-ready.** Phase 2 (`pause` strict + ordering) is specified but has open design items (§11) and is a deliberate + follow-up. See §15 changelog and `decisions.md`. +- **Slug:** partition-keys +- **Scope:** consumer-side ordered, parallel consumption by partition key. + Producer-side idempotency/dedup is a separate spec (deferred — §12). + +--- + +## 1. Goal + +Within one queue, events sharing a partition key are consumed **in order by a +single worker at a time**; events with different keys are consumed **in +parallel** — the log-native ("Kafka partition") model: order *within* a key, +parallelism *across* keys. + +## 2. The guarantee (precise, testable) + +- **G1 — per-key affinity + FIFO.** For a queue whose events carry a partition + key and a fixed slot count `N`, every event of key `K` maps to one slot + `slot(K) = (hashtextextended(K, 0) % N + N) % N` (the `+N` normalizes the sign; + `hashtextextended` returns `bigint`). Within that slot, non-retried events of + `K` are delivered in non-decreasing `ev_id` order, to **no other slot**. + Intra-batch order is the engine's `order by 1` (`pgque.sql:440`), preserved + through `get_batch_cursor`'s filter re-wrap (`pgque.sql:2277`); cross-batch + order follows from one subscription's monotonically-advancing cursor. +- **G2 — single in-flight processor per key.** At most one worker holds an + unacked event for `K`. Enforced by the per-subscription receive lock + (`next_batch_custom … for update of s`, `pgque.sql:5761` — the #97/#125 guard). +- **G3 — failure boundary (Phase 2 / `pause`).** Under `pause`, no later event of + `K` is delivered until `K`'s failed head event is acked or dead-lettered, and + after it resolves the deferred events deliver in `ev_id` order, exactly once. + Under `skip` (Phase 1 default), later events of `K` MAY arrive before the + failure resolves — only at-least-once holds. + - **Engine fact:** a retried event keeps its `ev_id`, gets a new `ev_txid` + (re-injected by `maint_retry_events` → `insert_event_raw`, `pgque.sql:859`), + and re-routes to the **same slot** because `ev_extra1` is preserved + (`pgque.sql:861`). So G1's `ev_id` monotonicity holds only between non-retried + events; across a retry the only ordering guarantee is G3's pause boundary. + +## 3. Why it's needed + +PgQue is an ordered, immutable **log**, not a job queue — workloads need +per-entity ordering without global ordering. Motivating case (a multi-tenant +storage service evaluating PgQue vs pg-boss): millions of file-lifecycle events +that **must be ordered per tenant** but need **no ordering across tenants**. + +## 4. Scope and phasing + +**Phase 1 — converged, build now:** +- Partition key on a `send()`-sourced event (D1, D6). +- N independent **slot consumers**, each filtering the stream to its hash class + via `get_batch_cursor` `extra_where` (§6). Stable affinity (D4). +- G1 + G2. **`skip` failure policy** (stateless, sound). +- Persisted/enforced `N` (D3); slot identity + single-owner (D7); SECURITY + DEFINER ownership model (§6). + +**Phase 2 — specified, follow-up (NOT converged):** +- **`pause` failure policy** (G3 strict). Needs a *defer-without-retry-increment* + primitive that does not exist today (§11 O1), a durable blocked-key marker + (D5), and carries the hot-blocked-key cost (§11 O2). Build after Phase 1. + +**Out of scope:** producer idempotency (§12); dynamic `N` / rebalancing (R4); +**trigger-sourced queues** (triggers use `ev_extra1` for the table name — D1, R5); +cascaded/multi-node; automatic hot-partition mitigation. + +**ICP:** multi-tenant SaaS on managed Postgres with a high-volume per-entity +event stream (entity = partition key). + +## 5. End-to-end workflow + +``` +producer: pgque.send('files', 'default', payload, partition_key => tenant_id) + │ key → ev_extra1 (send-sourced queues only) + ▼ +engine: append-only tables · global ev_id/ev_txid order (UNCHANGED) + │ full stream + ▼ +consumers: N slot consumers, each an INDEPENDENT subscription with its own cursor; + slot k reads the whole stream, server-side-filtered to its hash class +``` + +## 6. Architecture + +The mechanism is **N independent slot consumers**, not a modification of +cooperative consumers (round 1 B1: coop hands disjoint tick windows; confirmed +`pgque.sql:6262`). Each slot is its own subscription → own cursor + `sub_id` → no +cross-slot data loss; retry/DLQ rows are slot-scoped (`ev_owner = sub_id`, +`pgque.sql:2374`). + + +``` + producers │ send(queue, type, payload, partition_key => K) → ev_extra1 + ▼ + ┌──────────────────────────────────────────────────────────┐ + │ ENGINE · sacred — UNCHANGED │ + │ append-only tables · global ev_id/ev_txid · rotation │ + │ next_batch / get_batch_cursor(i_extra_where) / order by 1 │ + └───────┬───────────────┬───────────────┬──────────────────┘ + ▼ full stream ▼ full stream ▼ full stream + slot 0 (sub#0/N) slot 1 (sub#1/N) slot N-1 + own cursor own cursor own cursor + filter h%N=0 filter h%N=1 filter h%N=N-1 +``` + + +**Filtering without touching the engine.** Each slot's receive reuses the +admin-only `pgque.get_batch_cursor(…, i_extra_where)` hook (`pgque.sql:2229`, +the 4-arg overload), injecting `and (hashtextextended(ev_extra1,0) % N + N) % N = +k`, assembled only from the validated integers `N`,`k` (§8). `batch_event_sql`, +`next_batch`, rotation are not modified; the filter re-wrap preserves G1. + +**SECURITY DEFINER ownership (round 3 — corrected).** `get_batch_cursor`'s +`extra_where` is a trusted-SQL sink, revoked from `public/pgque_reader/ +pgque_writer`, admin-only (`pgque.sql:2221`, `:4852`). `receive_partitioned` and +`subscribe_slot` reach it **because they are owned by the same role that owns +`get_batch_cursor` (the install owner) — a function owner may execute its own +functions regardless of grants.** This is *not* the `receive`/`nack` pattern +(those never call `get_batch_cursor`; they call reader-granted internals), and it +does *not* depend on the owner holding `pgque_admin`. **Invariant:** the +partition functions MUST be created by the same role that ran `\i pgque.sql`. On +managed Postgres the installer is a non-superuser admin role; co-ownership (not a +grant) is what makes the wrapper work — state and test this explicitly. + +**Read amplification.** Every event is scanned by all `N` slot cursors (filter +applied after the engine materializes the window, `pgque.sql:2277` — reduces +returned rows, not scan work). ≈ N× steady; up to ~2N× during rotation overlap; +a stalled slot scans an ever-widening window. Documented; single-reader/dispatch +optimization is future (R6). + +## 7. Decisions + +| ID | Decision | Choice (v0.4) | Notes | +|----|----------|---------------|-------| +| D1 | Key location | `ev_extra1`, `send()`-sourced queues only | Triggers use `ev_extra1` for table name. | +| D2 | Failure policy | `skip` default (Phase 1); `pause` is Phase 2 (§11) | `pause` has open mechanics. | +| D3 | N | Fixed, persisted in `pgque.partition_consumer(queue, consumer, n)` (written inside SECURITY DEFINER `subscribe_slot`; table revoked from app roles); changed `n` rejected | Enforced invariant, not convention. | +| D4 | Assignment | `(hashtextextended(key,0) % N + N) % N` | Stable, sign-safe. | +| D5 | State budget | **Phase 1 / happy / `skip`: no state, no per-event writes.** **Phase 2 `pause`:** durable `pgque.partition_block(sub_id, partition_key, head_ev_id)` marker (FK `sub_id → subscription on delete cascade`; index `(sub_id, partition_key)`). Blocked keys additionally incur defer churn (§11 O1) — so "no per-event churn" is a Phase-1/non-blocked-key claim only. | Round 3 corrected the churn framing. | +| D6 | Producer signature | `send(queue, type, payload, partition_key => text)` | Avoids `send(queue,type,payload)` collision. | +| D7 | Slot & single-owner | slot = consumer `"#k/N"`; G2 via per-subscription receive lock; functions SECURITY DEFINER co-owned with `get_batch_cursor` (§6) | Reader-callable, owner-reachable. | + +## 8. Implementation details + +- **Producer:** `send(queue, type, payload, partition_key text default null)` → + `insert_event(…, ev_extra1 => partition_key, …)`. SECURITY DEFINER, pinned + search_path; revoke public, grant `pgque_writer`. +- **Tables (created in Phase 1, `if not exists`):** `partition_consumer` + (N persistence) and `partition_block` (Phase-2 marker, empty in Phase 1 so + test assertions are well-formed). Both revoked from app roles (the + `dead_letter` pattern, `dlq.sql:236`); written only inside SECURITY DEFINER + functions. +- **`subscribe_slot(queue, consumer, k int, n int)`:** validate `n>=1 and + 0<=k#k/n"`. Idempotent for the same `(k,n)`. +- **`receive_partitioned(queue, consumer, k int, n int, …)`:** after casting + `k,n` to int, `next_batch` + `get_batch_cursor(…, i_extra_where => + format('and (hashtextextended(ev_extra1,0) %% %s + %s) %% %s = %s', n,n,n,k))`. + SECURITY DEFINER (§6); granted `pgque_reader`. +- **`pause` (Phase 2):** on nack of `K#i`, upsert `partition_block(sub_id, K, + head_ev_id => ev_id)`. A later event of a blocked key (open marker with + `head_ev_id < ev_id`) is **deferred** (see §11 O1 for the missing primitive), + not server-side-dropped (dropping + cursor-advance would lose it). Clear the + marker when `K#i` is acked, **or** when it is dead-lettered — DLQ-unblock + predicate: a `dead_letter` row exists for `ev_id = K#i` and `dl_consumer_id` + equal to this slot's `co_id`, where the slot's `co_id` is obtained by joining + `subscription` (`partition_block.sub_id → subscription.sub_consumer = + dead_letter.dl_consumer_id`) — `sub_id` and `co_id` are different ID spaces + (`dlq.sql:24,75-85`, `pgque.sql:170-183`); do not compare them directly. +- **Teardown:** `unsubscribe_slot` removes the slot subscription (the + `partition_block` FK cascades). Note `unregister_consumer` cascades + `dead_letter` (`dlq.sql:24`), so dropping a slot drops its DLQ audit — + documented. +- **Grants:** producer → `pgque_writer`; `subscribe_slot`/`unsubscribe_slot`/ + `receive_partitioned` → `pgque_reader`; `partition_consumer`/`partition_block` + revoked from all app roles; `get_batch_cursor` stays admin-only. Deny-by-default + re-applied. + +## 9. Tests plan (red/green TDD), CI PG 14–18 + +**Phase 1 (must pass to ship):** +- **T-G1a:** literal `(hashtextextended(K,0)%N+N)%N` on every CI version, pinning + one concrete `(K, expected)` pair. *(red first)* +- **T-G1b:** interleave A,B,A,A,B → each key in `ev_id` order across batches, no + key on two slots. (No existing test guards intra-batch `ev_id` order.) +- **T-retry-affinity:** nack a keyed event; `maint_retry_events()` + + `force_next_tick` + `ticker()`; assert redelivery to the **same** slot only. +- **T-G2-block / T-G2-parallel:** same slot → second worker blocks (mirror + `two_session_receive_lock.sh`); different slots → neither blocks. +- **T-no-drop:** keys across all slots in one window; all N slots; union = all + events, zero loss. +- **T-security:** run against an install whose owner is a **non-superuser, + non-`pgque_admin` role** — a bare `pgque_reader` can call + `receive_partitioned`/`subscribe_slot` end-to-end, and **cannot** call + `get_batch_cursor` directly (`42501`, mirror `test_security_get_batch_cursor.sql`); + non-integer/out-of-range `n`,`k` rejected. +- **T-N-invariant:** `subscribe_slot(…,k,n)` idempotent; `(…,k,n2≠n)` raises. +- **T-no-bloat (happy path):** all-ack of M events → zero `retry_queue`/ + `dead_letter`/`partition_block` rows (guard the `partition_block` clause with + `to_regclass('pgque.partition_block') is not null`) and no per-event + UPDATE/DELETE. +- **T-engine-untouched:** `pg_get_functiondef` of `batch_event_sql`, + `next_batch_custom`, and `get_batch_cursor/4` (pin the 4-arg overload) + byte-identical to baseline. +- **T-idempotent-install:** re-running `pgque.sql` re-creates functions + the two + new tables (`create table/unique index if not exists`) cleanly. + +**Phase 2 (`pause`) — write when O1/O2 (§11) resolve:** +- **T-G3-pause:** A#2 nacked; drive maint+tick; A#3 withheld until A#2 + acked-or-DLQ'd; **and after unblock A#2 then A#3 deliver in `ev_id` order, + exactly once**; B unaffected. +- **T-DLQ-unblock:** A#2 exhausts retries → `dead_letter`; assert the + `partition_block` row for A drops to 0 **via the DLQ branch (no ack ever + occurred)** and A#3 then proceeds. +- **T-slot-crash:** worker holding A#2 dies; assert the `partition_block` row is + present after the crash and before the new worker's first receive (durability); + drive maint+tick; A#2 redelivered before A#3, only to slot k. Crash in the + post-`maint` window (retry_queue row already gone). +- **T-hot-blocked-key:** hot `K` blocked under `pause` → other slots unaffected; + defer cost is bounded by `K`'s backlog until DLQ, not total throughput. + +## 10. Risks + +- **R2 — read amplification:** N× steady, ~2N× rotation overlap, widening for a + stalled slot. Benchmark the stalled case. +- **R3 — hot partitions:** documented only. +- **R4 — changing N:** enforced invariant (D3); rebalancing is future. +- **R5 — `ev_extra1`:** send-sourced queues only. +- **R6 — single-reader/dispatch** to remove read amplification; future. +- **R7 — rotation pressure:** rotation waits for `min(sub_last_tick)` over ALL + subscriptions (`pgque.sql:910`); N slots lower the floor to the slowest slot. A + `pause`-blocked slot does *not* pin rotation (deferred events go to retry, not + the held cursor), **but** a hot blocked key keeps that slot perpetually lagging, + so a per-slot staleness alert cannot distinguish "wedged" from "hot key under + pause" — documented, not auto-mitigated. + +## 11. Open design items — Phase 2 `pause` (why it's a follow-up) + +- **O1 — defer-without-retry-increment primitive (blocking for `pause`).** + `finish_batch` acks the whole batch, so withholding `K#i+1` requires removing it + from the batch. A **server-side filter that lets the cursor advance would lose + it** (round-2 data-loss). `event_retry` preserves it but **increments + `ev_retry`**, so a long-blocked key's deferred events would falsely march toward + `max_retries`/DLQ. `pause` therefore needs a new "re-queue without counting as a + retry" path (or a hold-cursor design that doesn't wedge rotation). Undecided. +- **O2 — hot-blocked-key cost.** Until O1 is settled, a hot blocked key makes its + slot re-defer a growing backlog each poll (per-event churn for that key, + bounded by the head's time-to-DLQ). Acceptable for rare failures (the migration + ICP); needs documentation + T-hot-blocked-key before `pause` ships. + +## 12. Relationship to producer idempotency (deferred sibling) + +Producer dedup is a TTL window (SQS/NATS), append-only, GC'd by rotation — a +separate spec. Rationale: `blueprints/idempotency/DESIGN.md`. + +## 13. Review panel + +- **Lead:** drafts/revises. +- **Reviewer A — ops/security** · **Reviewer B — QA/testability.** Rounds 1–3 + applied. Round 3 verdict: Phase 1 converged; Phase 2 (`pause`) has open items + O1/O2 → split out as follow-up. + +## 14. Sprint plan + +1. **S1 — producer + key plumbing** (+ the two tables, empty). T-G1a, + T-no-bloat(happy), T-idempotent-install. +2. **S2 — slot consumers (`skip`), SECURITY DEFINER + co-ownership, persisted N.** + T-G1b, T-retry-affinity, T-G2-block/parallel, T-no-drop, T-security, + T-N-invariant, T-engine-untouched. **← Phase 1 ships here.** +3. **S3 — `pause` (Phase 2), gated on O1/O2.** T-G3-pause, T-DLQ-unblock, + T-slot-crash, T-hot-blocked-key. +4. **S4 — docs + benchmark** (read-amp: steady/rotation/stalled; per-tenant order). + +## 15. Changelog + +- **v0.4 (draft):** review round 3. **Phase 1 declared converged / + implementation-ready; `pause` split into Phase 2** with explicit open items + (§11 O1 defer-without-retry-increment, O2 hot-blocked-key). Corrected the + SECURITY DEFINER justification to the **co-ownership** invariant (not + `pgque_admin`; not "like receive/nack") + non-superuser-owner security test + (round 3 B1). Fixed the DLQ-unblock `sub_id`↔`co_id` join; added + `partition_block` FK-cascade + index + revoked-from-roles; tables created empty + in Phase 1; `T-no-bloat` guarded with `to_regclass`; `T-engine-untouched` pins + the `/4` overload; `T-G3-pause` now asserts in-order-exactly-once after unblock; + `T-DLQ-unblock` asserts marker-clear-via-DLQ; `T-slot-crash` asserts marker + durability; added `T-hot-blocked-key`. Round-3 detail in `decisions.md`. +- **v0.3:** round 2 — confirmed G1 ordering + G2 lock real; SECURITY DEFINER + wiring; durable `partition_block`; modulo sign fix; R7. +- **v0.2:** round 1 — N independent slot subscriptions; G1/G2/G3; `skip` default. +- **v0.1:** initial SamoSpec-format draft. diff --git a/blueprints/partition-keys/decisions.md b/blueprints/partition-keys/decisions.md new file mode 100644 index 00000000..5b2d2270 --- /dev/null +++ b/blueprints/partition-keys/decisions.md @@ -0,0 +1,145 @@ +# Partition Keys — decisions log + +Accepted / rejected / deferred choices, tracked across review rounds. + +## Review round 1 (Reviewer A ops/security · Reviewer B QA/testability) + +### Accepted (changed the spec) + +- **A1 — Drop the cooperative-consumer distribution model.** Both reviewers + proved coop hands each member a *disjoint tick window*, not a hash-filtered + shared batch; a filter overlay would drop other slots' events on cursor + advance. → v0.2 uses **N independent slot subscriptions**, each filtering the + full stream via `get_batch_cursor` `extra_where`. (SPEC §6) +- **A2 — Correct the retry rationale.** `event_retry` preserves `ev_id` and + changes `ev_txid` (event reappears in a later window); the original "later + ev_id" claim was wrong. → guarantee restated as G1/G2/G3 with an explicit + engine note. (SPEC §2) +- **A3 — Resolve D2-vs-"no state".** `pause` derives its blocked-key set from the + engine's existing `retry_queue`/`dead_letter`, scoped per slot by `sub_id` + (each slot is its own subscription); no new mutable table. (SPEC §7 D5, §8) +- **A4 — DLQ must unblock.** A paused key releases when its head event is acked + *or* dead-lettered, so a poison event cannot wedge a tenant past `max_retries`. + (SPEC §8; test T-DLQ-unblock) +- **A5 — `send` signature.** Use a new 4-arg `send(queue, type, payload, + partition_key =>)`; a 3-arg `send(queue, key, payload)` collides with the + existing `send(queue, type, payload)`. (SPEC §7 D6) +- **A6 — `hashtextextended(key, 0)`** instead of `hashtext()` (unstable across PG + majors → affinity would break on upgrade). (SPEC §7 D4) +- **A7 — `ev_extra1` restricted to `send()`-sourced queues** (triggers store the + table name there). (SPEC §7 D1) +- **A8 — Fixed N as an enforced invariant**: persisted per (queue, consumer); a + mismatched-N worker is rejected, not silently misrouting. (SPEC §7 D3) +- **A9 — Define "slot" and single-owner**: slot = named consumer `"#k/N"`; + G2 enforced by the existing per-consumer receive lock. (SPEC §7 D7) +- **A10 — Test corrections**: `T-engine-untouched` asserts `pg_get_functiondef` + (not generated SQL); `T-no-bloat` scoped to the happy path; added T-no-drop, + order-after-retry, DLQ-unblock, slot-crash, empty/hot-key, cross-version + affinity. (SPEC §9) + +### Deferred + +- **`pause` policy implementation** ships after `skip` (sound + simple first); + `skip` is the v0.1 default. `pause` is fully specified. (SPEC §4, §7 D2) +- **Read-amplification optimization** (single-reader/dispatch) — R6; adds a + hop/state, out of v0.1's no-state budget. +- **Trigger-sourced queues**, **dynamic N / rebalancing**, **hot-partition + mitigation** — out of scope, documented. + +### Rejected + +- **Lease table / advisory-lock-per-event** for serialization — reintroduces the + per-event churn PgQue exists to avoid (carried over from the superseded + `IDEMPOTENCY_AND_PARTITIONS.md`, now removed). +- **Modifying `batch_event_sql`** to push partitioning into the engine — violates + the sacred-engine rule; the `extra_where` hook achieves filtering without it. + +## Review round 2 (both personas, verified against the engine) + +### Confirmed sound (no change needed) +- **G1 `ev_id` ordering is true.** `batch_event_sql` emits `order by 1` + (`pgque.sql:440`); `get_batch_cursor` re-wraps the filtered stream with + `order by 1` (`pgque.sql:2277`) → per-key order survives the filter, no + consumer sort. (Reviewer B headline.) +- **G2 single-owner lock is real and tested** — `next_batch_custom … for update + of s` (`pgque.sql:5761`), guarded by `two_session_receive_lock.sh`. +- **Retry affinity holds** — `ev_extra1` preserved through `event_retry` / + `maint_retry_events`, so a retried event re-routes to the same slot. +- **Coop genuinely hands disjoint windows** (`for update skip locked`, + `pgque.sql:6262`) — confirms round-1 A1. + +### Accepted (changed the spec → v0.3) +- **B-R2-2 — `pause` blocked-set must be durable.** `retry_queue` is transient + (`maint_retry_events` deletes the row on re-injection, `pgque.sql:863`), + leaving a crash hole that violates G3. → durable + `partition_block(sub_id, partition_key, head_ev_id)` marker, O(failing keys), + cleared on ack-or-DLQ. Honestly reopens "no new table," scoped to `pause`. + (D5, §8, R1) +- **B-R2-1 — security trust boundary.** `get_batch_cursor.extra_where` is an + admin-only trusted-SQL sink (`pgque.sql:2221`, `:4852`). → `receive_partitioned` + / `subscribe_slot` are `SECURITY DEFINER` installer-owned; integers `n,k` + validated + cast; no caller string interpolated. Reframed the "injection-safe" + prose. (§6, §8, D7; test T-security) +- **Negative modulo bug** — `hashtextextended` returns `bigint`; bare `% N` can + be negative → `(h % N + N) % N`. (D4, §6, §8) +- **R7 rotation wedge** — N slots lower the rotation floor to the slowest slot; a + wedged `pause` slot could pin rotation for the whole queue. → `pause` must not + hold the batch open; cursor advances past non-blocked keys. (R7, §8) +- **N persistence + teardown** — N persisted in `partition_consumer`; + `unsubscribe_slot` + DLQ-cascade caveat. (D3, §8) +- **Tests** — added T-retry-affinity, T-security, T-N-invariant; split T-G2 into + block/parallel; added `get_batch_cursor` to T-engine-untouched; pinned a + concrete hash pair. (§9) + +### Round-1 closure scorecard (both reviewers) +- B1 (coop model) ✅ closed · B2 (retry rationale) ✅ closed · B3 (crash-derive + blocked set) ♻️ reopened in v0.2, now ✅ closed via durable marker (D5) · + B4 (D2-vs-state / send sig) ✅ closed · B5 (DLQ-unblock / slot definition) + ✅ closed; spawned B-R2-1 (now fixed) · B6 ✅ closed. + +## Review round 3 (convergence; both personas verified against the engine) + +### Verdict +**Phase 1 (`skip`-default partition consumption) CONVERGED / implementation-ready. +Phase 2 (`pause`) NOT converged — split out as a follow-up** with open items O1/O2. +Both reviewers agreed the round-2 engine-anchor and security *posture* are solid; +the remaining gaps are all in the new `pause`/DLQ surface. + +### Accepted → v0.4 +- **B1 (security ownership, affects Phase 1).** The "SECURITY DEFINER owned like + receive/nack" justification was wrong: `receive`/`nack` never call + `get_batch_cursor`. The real mechanism is **co-ownership** — a function owner + may execute its own functions regardless of grants, so `receive_partitioned` + reaches the admin-only `get_batch_cursor` only because the install owner owns + both. Not `pgque_admin` membership. Invariant: partition functions created by + the `\i pgque.sql` owner. Test under a non-superuser owner. (§6, D7, T-security) +- **The `pause` withhold mechanic is genuinely unsolved (O1).** Combining the two + reviewers: a server-side filter that advances the cursor **loses** the withheld + event (data loss); `event_retry` preserves it but **increments `ev_retry`**, so + deferred events of a long-blocked key march toward false DLQ. `pause` needs a + *defer-without-retry-increment* primitive that does not exist. → `pause` is + Phase 2; O1 is its blocking open item. (§11 O1) +- **Hot-blocked-key cost (O2).** Until O1, a hot blocked key re-defers a growing + backlog per poll (bounded by head's time-to-DLQ). Document + T-hot-blocked-key. +- **DLQ-unblock ID-space join.** Marker keyed on `sub_id`; `dead_letter.dl_consumer_id` + is `co_id`. Must join `subscription` to map; do not compare directly. (§8) +- **`partition_block` hygiene:** FK `sub_id → subscription on delete cascade` + (no orphans), index `(sub_id, partition_key)`, revoked from app roles, created + empty in Phase 1 so `T-no-bloat` is well-formed (guard with `to_regclass`). (§8, D5) +- **Test tightening:** `T-engine-untouched` pins the `get_batch_cursor/4` overload; + `T-G3-pause` asserts in-order-exactly-once after unblock; `T-DLQ-unblock` + asserts marker-clear-via-DLQ-branch (no ack); `T-slot-crash` asserts marker + durability; `T-security` runs under a non-superuser owner. (§9) +- **N persistence writer/grants:** `partition_consumer` written inside + SECURITY DEFINER `subscribe_slot`; table revoked from app roles. (D3, §8) + +### Round closure +B-R2-1 security: posture closed, ownership prose corrected (B1). B-R2-2 durable +marker: direction correct; hygiene (FK/index/grants) + the withhold mechanic (O1) +now specified/flagged. All round-1 + round-2 *Phase-1* items closed. + +## Still open (Phase 2 `pause`, before it can be built) +- **O1** — choose the defer-without-retry-increment mechanism (new primitive vs + hold-cursor-without-wedging-rotation). +- **O2** — bound + document the hot-blocked-key degradation. +- Read-amplification bench numbers (R2) to decide if R6 is needed. diff --git a/web/public/briefs/partition-keys.html b/web/public/briefs/partition-keys.html new file mode 100644 index 00000000..e35ff604 --- /dev/null +++ b/web/public/briefs/partition-keys.html @@ -0,0 +1,240 @@ + + + + + +PgQue Brief · Partition Keys + + + + +
+ +
+

PgQue · Design Brief

+

Partition KeysOrdered, parallel consumption — the log-native way

+
+ slug partition-keys + version v0.4 (draft) + Phase 1 converged + engine untouched +
+
+ +

+ Within one queue, events that share a partition key are consumed + in order by a single worker at a time; events with different keys are + consumed in parallel. Order within a key, parallelism + across keys — Kafka's partition model, achieved by routing, with + no per-event locks and no new mutable state. +

+ +

01The problem

+

+ PgQue is an ordered, immutable log, not a job queue. The motivating + workload (a multi-tenant storage service evaluating PgQue to replace pg-boss) emits + millions of file-lifecycle events — FileCreated, + FileDeleted, FileOverwritten. They must be processed + in order per tenant, but order across tenants does not matter. + One in-order consumer can't keep up; naive multi-worker consumption breaks per-tenant order. +

+ +

02The guarantee

+
+
G1

Per-key affinity + FIFO. Every event of key K maps to one slot hashtextextended(K,0) % N; within it, non-retried events of K arrive in ev_id order, never to another slot.

+
G2

Single processor per key. At most one worker holds an unacked event for K at any instant.

+
G3

Failure boundary. pause: no later K event until the failed one is acked or dead-lettered. skip (v0.1 default): at-least-once, order not held after a failure.

+
+ +

03Architecture

+
+ + + + + + + + producers + send(queue, type, payload, partition_key => K) → ev_extra1 + + + + engine · sacred — UNCHANGED + append-only tables · global ev_id / ev_txid order + next_batch · get_batch_cursor(extra_where) · rotation — no edits to batch_event_sql + + + + + full stream + full stream + full stream + + + + slot 0 · sub#0/N + own cursor + extra_where: h%N==0 + keys in ev_id order + one worker + + + + slot 1 · sub#1/N + own cursor + extra_where: h%N==1 + keys in ev_id order + one worker + + + + slot N-1 · sub#k/N + own cursor + extra_where: h%N==N-1 + keys in ev_id order + one worker + + + each slot = independent subscription · own cursor → no data loss + one key → one slot → per-key order · distinct keys → parallel + cost: N× read amplification (each slot scans the full stream, server-side hash-filtered) + +
Slots are independent subscriptions filtering via the existing extra_where hook. The PgQ engine is not modified.
+
+ +

04Scope

+
+
+

Phase 1 · converged, build now

+
    +
  • Partition key on send()-sourced events
  • +
  • N independent slot consumers, stable affinity
  • +
  • Per-key order + single processor (G1, G2)
  • +
  • skip failure policy (default)
  • +
+
+
+

Phase 2 · follow-up / out

+
    +
  • pause strict order — open mechanic (defer w/o retry-count)
  • +
  • Producer idempotency / dedup window
  • +
  • Trigger-sourced queues · dynamic N
  • +
  • Hot-partition mitigation · read-amp optimization
  • +
+
+
+ +

05Key decisions

+ + + + + + + + + + + +
IDDecisionChoice (v0.2)
D1Where the key livesev_extra1, send()-sourced queues only
D2Failure policyskip default (Phase 1); pause = Phase 2
D3Slot count NFixed, persisted & enforced per (queue, consumer)
D4Assignment function(hashtextextended(key,0) % N + N) % N (stable, sign-safe)
D5State budgetHappy path & skip: none. pause: a compact blocked-key marker (per failing key, not per event)
D6Producer signaturesend(queue, type, payload, partition_key =>)
D7Slot & single-ownerslot = consumer "<c>#k/N"; receive lock
+

Three review rounds against the real engine. Round 2 verified G1's ev_id ordering and the G2 single-owner lock are real. Round 3 declared Phase 1 (skip) converged / implementation-ready and split pause out as a follow-up: withholding a blocked key's later event has an unsolved mechanic — a server-side drop would lose the event, and event_retry would wrongly count it toward DLQ — so strict pause needs a defer-without-retry-increment primitive that doesn't exist yet. It also corrected the security model: the filter hook is admin-only, so the consumer wrappers reach it by co-ownership with the installer, not by a role grant.

+ +

06Sprint plan

+
+ S1  producer + key plumbing + S2  slot consumers (skip) + S3  pause policy (v0.2) + S4  docs + read-amp benchmark +
+

+ Phase 1 (S1–S2) is the converged, build-now slice. Phase 2 (S3, pause) + is gated on two open items: a defer-without-retry-increment primitive (O1) and + bounding the hot-blocked-key cost (O2). Phase 1 alone covers the per-tenant + ordered, parallel consumption the motivating workload needs. +

+ + + +
+ +