From 6e59e39c414bd2e196598c0a39bd490093b25ed2 Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 21 May 2026 11:46:43 +0900 Subject: [PATCH 1/3] feat(mediorum): bound ops table via dormant cleanup, gap signal, and opt-in retention The crudr ops table is currently unbounded: every CRUD-tracked write appends a row that lives forever. On scaled tables (~250M rows) it dominates the database. This PR introduces three coordinated mechanisms to bound it safely, plus the correctness invariants that keep them safe under peer divergence. # Mechanisms * One-time dormant cleanup. On boot, drop ops rows for tables with no write in the dormant window (default 90d). Idempotent; opt-out via OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS=true. Runs off the boot path so /health-check stays reachable while a multi-million-row backlog drains. * Retention gap signal. When a peer's sweep cursor falls below our lowest available ulid, ServeCrudSweep emits X-Mediorum-Retention-Gap=true and X-Mediorum-Available-Min-Ulid=. The peer's doSweep stages a cursor advance across the gap. Wire format unchanged; older clients ignore the headers. * Opt-in per-table retention sweep. When OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS is set, a managed routine prunes per-table ops older than the configured window, gated by the slowest active peer cursor (with a safety margin). Unset = no deletions. # Correctness invariants The sweep can be silently disabled or permanently desynchronized in three independent ways. Each is fixed: 1. Non-peer rows pin the retention floor. The cursors table is shared with workers like qm_fix_truncated, which writes a CID into LastULID. computeRetentionCutoff filters to active peer hosts and treats unparseable cursors as missing. 2. Bad cursors heal nowhere. A persisted LastULID outside the plausible window (far-future or epoch) gets echoed as ?after= and re-upserted at end of sweep, locking the peer into a permanent prune-and-recreate cycle. doSweep treats an implausible LastULID as missing on load; ApplyOp rejects implausible op.ULID at the boundary. 3. Gap signal stages too eagerly. Honoring the gap header on first contact lets a peer set our initial cursor at any chosen position. Persisting the staged value before the body decodes also loses the gap on partial failure. The gap branch now requires an existing cursor, defers persistence to the end-of-sweep upsert, and counts MarkSweepGapAdvance only after the upsert succeeds and the durable cursor still sits at or above the staged floor. # Additional correctness * ServeCrudSweep snapshot. MIN(ulid) and the body Find run in a REPEATABLE READ read-only transaction so a retention DELETE that commits between the two reads cannot leave us serving a row the gap header doesn't cover. * EnsureOpsTableIndex pinned-conn lock + invalid-index self-heal. The composite ops("table", ulid) index can take 30-60 min to build on scaled tables. The advisory lock is on a pinned *sql.Conn because gorm.WithContext returns a session over the shared pool that would release the lock on a different connection. The invalid-leftover probe + DROP handles the recovery case where a prior process died mid-CREATE INDEX CONCURRENTLY; without it, IF NOT EXISTS short-circuits on the INVALID index name forever and retention silently degrades to seq-scans. * Self-host normalization. server.New lowercases and trailing-slash-strips config.Self.Host before the peer-vs-self comparison so a chain-registry drift cannot land self in peerHosts and block all retention via the empty-peer sentinel. * Panic recovery on managed routines. lifecycle.AddManagedRoutine has no built-in recover; the dormant cleanup, ops-index ensure, and retention sweep are wrapped in defer recover so a panic in any one cannot crash mediorum. # Tests pkg/mediorum/crudr/retention_test.go covers dormant cleanup (per-table, threshold, opt-out, dry-run), gap signal (emit, validation, first-contact rejection, hostile far-future ulid, garbage ulid), retention sweep (cursor floor, safety margin, ancient cursor, empty cursor, malformed cursor, concurrent sweep + delete). TestRetentionTick_NonPeerCursorRowsIgnored covers the qm_fix_truncated-shaped row that motivated the activePeers filter. go test -count=1 ./pkg/mediorum/crudr/ green. --- pkg/mediorum/crudr/client.go | 59 +- pkg/mediorum/crudr/crudr.go | 86 ++ pkg/mediorum/crudr/retention.go | 730 ++++++++++++++++ pkg/mediorum/crudr/retention_test.go | 1208 ++++++++++++++++++++++++++ pkg/mediorum/server/serve_crud.go | 19 +- pkg/mediorum/server/server.go | 77 +- 6 files changed, 2166 insertions(+), 13 deletions(-) create mode 100644 pkg/mediorum/crudr/retention.go create mode 100644 pkg/mediorum/crudr/retention_test.go diff --git a/pkg/mediorum/crudr/client.go b/pkg/mediorum/crudr/client.go index 2438082f..aec601e7 100644 --- a/pkg/mediorum/crudr/client.go +++ b/pkg/mediorum/crudr/client.go @@ -129,7 +129,11 @@ func (p *PeerClient) doSweep(ctx context.Context) error { host := p.Host bulkEndpoint := "/internal/crud/sweep" // hardcoded - // get cursor + // get cursor. Treat an implausible persisted LastULID (e.g., a + // pre-PR row with a far-future or epoch ULID) as missing. Without + // this guard, doSweep would echo the bad value as `after=` and + // re-upsert the same bad value at end-of-sweep, locking the peer + // into a permanent prune-recreate cycle that no other path heals. lastUlid := "" { var cursor Cursor @@ -138,6 +142,10 @@ func (p *PeerClient) doSweep(ctx context.Context) error { if !errors.Is(err, gorm.ErrRecordNotFound) { p.logger.Warn("failed to get cursor", "err", err) } + } else if cursor.LastULID != "" && !isValidPeerSuppliedULID(cursor.LastULID) { + p.logger.Warn("persisted cursor LastULID is implausible; treating as missing", + "peer", host, + "last_ulid", cursor.LastULID) } else { lastUlid = cursor.LastULID } @@ -164,6 +172,42 @@ func (p *PeerClient) doSweep(ctx context.Context) error { return fmt.Errorf("bad status: %d", resp.StatusCode) } + // Retention-gap signal: the peer has dropped ops below our cursor and + // is advertising the lowest ulid it still has. Stage a cursor advance + // across the gap. Without this branch, the response body would be + // applied normally and the cursor would silently skip the gap. + // + // First-contact guard: if we have no durable cursor (`lastUlid == ""`), + // the empty-string < anyULID comparison would let a peer establish + // our initial cursor at an attacker-chosen position. Require an + // existing cursor before honoring the gap. + // + // Defer-until-decode: stage the advance in lastUlid here but do NOT + // persist. The end-of-sweep upsert is the single persistence point. + // On body-decode failure or apply error we abort without writing the + // forward cursor, so the next sweep retries the same range cleanly. + gapMinULID := resp.Header.Get(HeaderAvailableMin) + gapAdvancePending := false + if resp.Header.Get(HeaderRetentionGap) == "true" && gapMinULID != "" { + switch { + case lastUlid == "": + p.logger.Warn("ignoring retention-gap header on first contact (no durable cursor yet)", + "peer", host, + "advertised_ulid", gapMinULID) + case !isValidPeerSuppliedULID(gapMinULID): + p.logger.Warn("ignoring retention-gap header with invalid ulid", + "peer", host, + "advertised_ulid", gapMinULID) + case gapMinULID > lastUlid: + p.logger.Warn("retention gap detected: peer cursor below peer's available history; staging cursor advance across gap", + "peer", host, + "local_cursor", lastUlid, + "peer_available_min_ulid", gapMinULID) + lastUlid = gapMinULID + gapAdvancePending = true + } + } + var ops []*Op dec := json.NewDecoder(resp.Body) err = dec.Decode(&ops) @@ -197,12 +241,21 @@ func (p *PeerClient) doSweep(ctx context.Context) error { p.Seeded = true } - // set cursor - { + // set cursor. Single persistence point for both normal advance and + // gap-staged advance. MarkSweepGapAdvance fires only after the + // upsert succeeds AND lastUlid is still at-or-above the staged gap + // floor. Body ops that regress lastUlid below the gap suppress + // the counter so it never over-reports relative to durable state. + // Skip the upsert when lastUlid stays empty so a first-contact + // rejection or no-ops response doesn't write an empty cursor row + // that would later block all retention via the empty-peer sentinel. + if lastUlid != "" { upsertClause := clause.OnConflict{UpdateAll: true} err := p.crudr.DB.Clauses(upsertClause).Create(&Cursor{Host: host, LastULID: lastUlid}).Error if err != nil { p.logger.Error("failed to set cursor", "err", err) + } else if gapAdvancePending && lastUlid >= gapMinULID { + MarkSweepGapAdvance() } } diff --git a/pkg/mediorum/crudr/crudr.go b/pkg/mediorum/crudr/crudr.go index c962dba4..c4ce5917 100644 --- a/pkg/mediorum/crudr/crudr.go +++ b/pkg/mediorum/crudr/crudr.go @@ -10,6 +10,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/OpenAudio/go-openaudio/pkg/httputil" "github.com/OpenAudio/go-openaudio/pkg/lifecycle" @@ -20,6 +21,80 @@ import ( "gorm.io/gorm/clause" ) +// opsTableIndexLockKey is the pg session-scoped advisory-lock key for +// the composite ops("table", ulid) index migration. ASCII-packed +// "ops_indx" so a future maintainer adding adjacent locks can spot +// the convention. +const opsTableIndexLockKey int64 = 0x6F70735F696E6478 + +// EnsureOpsTableIndex creates the composite index ops("table", ulid) +// used by the dormant cleanup and (when enabled) the per-table +// retention queries. Designed to run OFF the boot path via a managed +// routine so the multi-hour CONCURRENTLY build on val001-scale tables +// cannot block /health-check. +// +// The advisory lock is acquired on a pinned *sql.Conn so all five +// steps (acquire, probe-invalid, optional DROP, CREATE, unlock) share +// one Postgres backend session. Without the pin, gorm's pool checks +// out a different conn per call and the session-scoped lock leaks +// off the original session. Two processes could each "acquire" the +// lock on different pool conns and race the CONCURRENTLY build. +// +// The invalid-leftover probe + DROP handles the recovery case where +// a prior process died mid-CONCURRENTLY; without it, the next boot's +// CREATE IF NOT EXISTS short-circuits on the INVALID index name and +// retention paths silently degrade to seq-scans forever. +func EnsureOpsTableIndex(ctx context.Context, db *gorm.DB) error { + sqlDB, err := db.DB() + if err != nil { + return fmt.Errorf("ops-index ensure: get underlying *sql.DB: %w", err) + } + conn, err := sqlDB.Conn(ctx) + if err != nil { + return fmt.Errorf("ops-index ensure: pin connection: %w", err) + } + defer conn.Close() + + var locked bool + if err := conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, opsTableIndexLockKey).Scan(&locked); err != nil { + return fmt.Errorf("ops-index ensure: acquire advisory lock: %w", err) + } + if !locked { + return nil + } + defer func() { + // Detached short-context: a shutdown signal during the build + // shouldn't strand the lock. conn.Close also releases + // session-scoped locks as belt-and-suspenders. + unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _ = conn.ExecContext(unlockCtx, `SELECT pg_advisory_unlock($1)`, opsTableIndexLockKey) + }() + + var invalid bool + if err := conn.QueryRowContext(ctx, ` + SELECT EXISTS ( + SELECT 1 + FROM pg_index i + JOIN pg_class c ON c.oid = i.indexrelid + WHERE c.relname = 'idx_ops_table_ulid' + AND NOT i.indisvalid + ) + `).Scan(&invalid); err != nil { + return fmt.Errorf("ops-index ensure: probe invalid index: %w", err) + } + if invalid { + if _, err := conn.ExecContext(ctx, `DROP INDEX CONCURRENTLY IF EXISTS idx_ops_table_ulid`); err != nil { + return fmt.Errorf("ops-index ensure: drop invalid index: %w", err) + } + } + + if _, err := conn.ExecContext(ctx, `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ops_table_ulid ON ops ("table", ulid)`); err != nil { + return fmt.Errorf("ops-index ensure: create index: %w", err) + } + return nil +} + const ( ActionCreate = "create" ActionUpdate = "update" @@ -257,6 +332,17 @@ func (c *Crudr) KnownType(op *Op) bool { } func (c *Crudr) ApplyOp(op *Op) error { + // Reject implausible ULIDs from any source (peer push, sweep response, + // or local code). Local code constructs via ulid.Make() which always + // passes; the check is essentially free on the local path. A hostile + // peer can otherwise spoof op.Host = c.host to bypass any host-keyed + // gate AND plant a poisoned ULID (epoch, far-future, malformed) that + // would persist into cursors and break ulidShiftBack on the retention + // floor probe. + if !isValidPeerSuppliedULID(op.ULID) { + return fmt.Errorf("rejecting op with implausible ULID %q from origin %s", op.ULID, op.Host) + } + elemType, ok := c.typeMap[op.Table] if !ok { return fmt.Errorf("no type registered for %s", op.Table) diff --git a/pkg/mediorum/crudr/retention.go b/pkg/mediorum/crudr/retention.go new file mode 100644 index 00000000..bdfafb15 --- /dev/null +++ b/pkg/mediorum/crudr/retention.go @@ -0,0 +1,730 @@ +package crudr + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/OpenAudio/go-openaudio/pkg/env" + "github.com/oklog/ulid/v2" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// RetentionConfig holds knobs for the ops retention sweep and the one-time +// dormant-table cleanup. Configuration is read from environment variables in +// LoadRetentionConfig, with safe defaults. +// +// Lifecycle: +// - DormantCleanupEnabled defaults to true. The cleanup runs once per +// process start; re-running is a no-op once the table has been cleaned. +// Set OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS=true to opt out. +// - RetentionDays==0 disables the ongoing retention sweep (archive mode, +// current default behavior). Setting OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS +// to a positive integer enables the sweep. +type RetentionConfig struct { + // DormantCleanupEnabled controls Component 1 (one-time dormant-table cleanup). + DormantCleanupEnabled bool + // DormantThreshold is the minimum age of the newest op for a table to be + // considered dormant. The default mirrors the structurally-dormant signal + // observed in qm_audio_analyses (no producer writes since Nov 2025). + DormantThreshold time.Duration + + // RetentionDays controls Component 3 (ongoing retention sweep). Zero + // disables the sweep (archive mode). Positive values delete ops older + // than this many days, subject to the cursor floor. + RetentionDays int + // SweepInterval is the cadence of the ongoing retention sweep loop. + SweepInterval time.Duration + // SweepBatchLimit is the maximum number of ops deleted in a single + // DELETE statement. Keeps long-running transactions short and avoids + // blocking concurrent ops writes. + SweepBatchLimit int + // CursorSafetyMargin is subtracted from min(cursors.last_ulid) before + // computing the cutoff. Gives the slowest reachable peer time to catch + // up between sweeps. + CursorSafetyMargin time.Duration +} + +// LoadRetentionConfig reads the retention configuration from environment +// variables. All fields use OPENAUDIO_ canonical names. +func LoadRetentionConfig() RetentionConfig { + cfg := RetentionConfig{ + DormantCleanupEnabled: !env.Bool("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS"), + DormantThreshold: env.GetDuration(90*24*time.Hour, "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD"), + RetentionDays: env.GetInt(0, "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS"), + SweepInterval: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL"), + SweepBatchLimit: env.GetInt(10000, "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT"), + CursorSafetyMargin: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN"), + } + return cfg +} + +// RetentionStats are atomic counters exposed for operator-visible metrics. +type RetentionStats struct { + // DormantTablesCleaned counts tables whose ops were dropped by the one-time + // dormant-table cleanup during this process lifetime. + DormantTablesCleaned atomic.Uint64 + // DormantOpsDeleted counts the total number of ops rows the dormant-table + // cleanup deleted during this process lifetime. + DormantOpsDeleted atomic.Uint64 + // RetentionOpsDeleted counts ops rows deleted by the ongoing retention + // sweep during this process lifetime. + RetentionOpsDeleted atomic.Uint64 + // RetentionSweepsSkipped counts sweep ticks where no rows were eligible + // for deletion (empty cursor blocked the cutoff, no rows older than + // MinAge, etc). + RetentionSweepsSkipped atomic.Uint64 + // SweepGapAdvances counts the number of times the local sweep client + // observed a retention-gap signal from a peer and explicitly advanced + // its cursor across the gap. This is the operator-visible metric + // for Topic-7 silent-skip detection. + SweepGapAdvances atomic.Uint64 +} + +// retention is the package-level set of stats, exposed for tests and +// operator metrics. Values reset only on process restart. +var retention RetentionStats + +// Stats returns a pointer to the package retention stats. The fields are +// atomic counters and may be read concurrently with the running sweep. +func Stats() *RetentionStats { return &retention } + +// MinAvailableULID returns the smallest ulid currently stored in the ops +// table. Returns ("", nil) when the ops table is empty. Callers use this +// to advertise the retention floor (see ServeCrudSweep gap signal) and to +// short-circuit retention work when the table is already empty. +func (c *Crudr) MinAvailableULID(ctx context.Context) (string, error) { + var out sql.NullString + err := c.DB.WithContext(ctx). + Raw(`SELECT MIN(ulid) FROM ops`). + Scan(&out).Error + if err != nil { + return "", err + } + if !out.Valid { + return "", nil + } + return out.String, nil +} + +// minDormantThreshold is the smallest dormancy window we will honor. An +// operator who sets OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD to a value +// below this is treated as misconfigured and the threshold is clamped up. +// 24h is a wide enough floor that a temporarily-quiet table is never +// misclassified as dormant during a brief lull. +const minDormantThreshold = 24 * time.Hour + +// dormantBatchSize bounds the per-statement work the dormant cleanup +// does. A single all-rows DELETE on a 50M-row dormant table holds locks +// and WAL for the entire delete; batching keeps each transaction small +// enough that concurrent op writes for OTHER tables aren't blocked by +// the cleanup, and an OOM-kill mid-cleanup rolls back at most one +// batch's worth of work. +const dormantBatchSize = 10000 + +// CleanupDormantOps drops ops rows for tables that have not received a write +// in cfg.DormantThreshold. It is idempotent: re-running is a no-op once a +// table has been cleaned (the newest remaining op is now from this process's +// write traffic, not the historical sediment). +// +// The set of registered tables comes from c.typeMap, so a table that is no +// longer registered by mediorum (e.g. removed in a future PR) cannot be +// misclassified as dormant by this function. We only delete ops for tables +// the caller has registered with RegisterModels. +// +// Gap-signal interaction. The ServeCrudSweep gap signal advertises +// MIN(ulid) across ALL tables in ops, not per-table. When this cleanup +// removes a fully-dormant table's ops but other tables still hold older +// ops (the val001 baseline: `uploads` rows back to 2023-03 outlast +// `qm_audio_analyses` ops that were emitted in 2024), the overall +// min(ulid) does not change. A peer whose cursor falls between the +// dormant table's oldest and newest ulids would, on next sweep, silently +// skip the deleted dormant-table ops because the gap signal only fires +// against the overall floor. This is acceptable specifically for dormant +// tables because, by construction, their producer code has been removed +// (no live consumer depends on the deleted ops). New maintainers who add +// a CRUD table that retains both an active producer and a low write +// cadence (e.g., quarterly metrics) should not rely on the gap signal +// to cover this case — keep such tables out of CRUDR or out-of-band. +// +// Future-maintainer note: any new CRUD table added via RegisterModels must +// expect to be wiped here if it sees no writes for cfg.DormantThreshold +// (default 90d). A legitimate low-write-cadence table (e.g. a quarterly +// metric) belongs outside the CRUD layer or behind a non-default +// threshold; the dormancy default is calibrated for upload/audio-analysis- +// rate write streams. +// +// Deletes are batched so a multi-million-row dormant table does not hold +// locks or WAL for one giant transaction. Each batch is its own statement +// bounded by `ulid < cutoffULID`; that guard prevents the race where a +// producer writes a new op for the table after the dormancy check but +// before the batched DELETE catches up — the new op's ulid sits above the +// cutoff and is never touched. An interrupted cleanup leaves a well-formed +// table with fewer dormant rows, and the next run picks up where the +// previous left off. +// +// Returns the per-table deletion counts and a non-nil error only if a DB +// operation fails. A no-op clean (cfg disabled or no dormant tables) is a +// nil-error empty-map return. +func (c *Crudr) CleanupDormantOps(ctx context.Context, cfg RetentionConfig) (map[string]int64, error) { + deleted := map[string]int64{} + if !cfg.DormantCleanupEnabled { + c.logger.Info("dormant ops cleanup disabled by OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS") + return deleted, nil + } + + threshold := cfg.DormantThreshold + if threshold < minDormantThreshold { + c.logger.Warn("dormant threshold below safety floor; clamping", + zap.Duration("configured", cfg.DormantThreshold), + zap.Duration("floor", minDormantThreshold)) + threshold = minDormantThreshold + } + + // Snapshot the currently registered table set under the mutex so a + // concurrent RegisterModels call cannot race with the scan below. + c.mu.Lock() + tables := make([]string, 0, len(c.typeMap)) + for t := range c.typeMap { + tables = append(tables, t) + } + c.mu.Unlock() + + cutoff := time.Now().Add(-threshold) + cutoffULID, err := ulidAtTime(cutoff) + if err != nil { + return deleted, fmt.Errorf("compute dormant cutoff: %w", err) + } + + for _, table := range tables { + if err := ctx.Err(); err != nil { + return deleted, err + } + + // Find the most recent op for this table. NULL => no ops at all, + // which means the table is either brand-new or has already been + // cleaned; either way, skip. + var maxULID sql.NullString + err := c.DB.WithContext(ctx). + Raw(`SELECT MAX(ulid) FROM ops WHERE "table" = ?`, table). + Scan(&maxULID).Error + if err != nil { + return deleted, fmt.Errorf("query max ulid for %s: %w", table, err) + } + if !maxULID.Valid || maxULID.String == "" { + c.logger.Debug("dormant cleanup: no ops for table", + zap.String("table", table)) + continue + } + + // Compare lex order against the cutoff. ULIDs sort + // chronologically, so newest_op < cutoff_ulid means the newest op + // is older than the cutoff, i.e. the table is dormant. + if maxULID.String >= cutoffULID { + c.logger.Debug("dormant cleanup: table still active", + zap.String("table", table), + zap.String("newest_op_ulid", maxULID.String), + zap.String("cutoff_ulid", cutoffULID)) + continue + } + + // Batched DELETE bounded by the dormancy cutoff. Each iteration + // takes one bounded chunk and commits, so a multi-million-row + // dormant table is cleaned in pieces. The `ulid < cutoffULID` + // guard is load-bearing: if a producer writes a new op for this + // table between the dormancy check above and a later batch (the + // "dormant table just became active mid-cleanup" race), that new + // op's ULID is above the cutoff and is preserved. Loop exits + // when a batch returns zero rows. + var totalForTable int64 + for { + if err := ctx.Err(); err != nil { + return deleted, err + } + res := c.DB.WithContext(ctx).Exec(` + WITH victims AS MATERIALIZED ( + SELECT ulid + FROM ops + WHERE "table" = ? AND ulid < ? + ORDER BY ulid ASC + LIMIT ? + ) + DELETE FROM ops WHERE ulid IN (SELECT ulid FROM victims) + `, table, cutoffULID, dormantBatchSize) + if res.Error != nil { + return deleted, fmt.Errorf("delete dormant ops for %s: %w", table, res.Error) + } + totalForTable += res.RowsAffected + if res.RowsAffected < dormantBatchSize { + break + } + } + if totalForTable > 0 { + deleted[table] = totalForTable + retention.DormantTablesCleaned.Add(1) + retention.DormantOpsDeleted.Add(uint64(totalForTable)) + c.logger.Info("dormant cleanup: dropped ops for table", + zap.String("table", table), + zap.Int64("ops_deleted", totalForTable), + zap.String("newest_op_ulid", maxULID.String), + zap.Duration("threshold", threshold)) + } + } + return deleted, nil +} + +// DryRunPlan describes what a real retention pass would do without +// executing any DELETE. It is the operator-facing preview surface for +// retention behavior. Counts are exact (computed against live ops state) +// but the on-disk size estimate is heap-only and excludes index/TOAST +// overhead; multiply by ~1.18 to approximate index+heap recovery for the +// `ops` relation on val001 (84 GiB total / 72 GiB heap). +type DryRunPlan struct { + // DormantTables maps table name -> rows the dormant cleanup would + // delete. Tables that are still active are omitted. Empty when the + // dormant cleanup is disabled. + DormantTables map[string]int64 + // DormantBytes is the sum of pg_column_size over the rows + // DormantTables would delete. Heap-only. + DormantBytes int64 + // RetentionRows is the count of rows the ongoing retention sweep + // would delete on its next tick. Zero when RetentionDays <= 0. + RetentionRows int64 + // RetentionBytes is the heap-only sum for those rows. + RetentionBytes int64 + // RetentionSkipReason is the reason the retention sweep would + // skip this tick (empty cursor, ops table empty, etc). Empty when + // the sweep would proceed. + RetentionSkipReason string + // RetentionCutoffULID is the cutoff the retention sweep would use. + // Empty when the sweep would skip. + RetentionCutoffULID string + // DormantCutoffULID is the cutoff the dormant cleanup would use. + DormantCutoffULID string +} + +// DryRunRetention computes what CleanupDormantOps and one retentionTick +// would do given cfg, without executing any DELETE. Use this from a +// debug endpoint, an `audius-ctl` subcommand, or operator scripting +// before flipping retention on. +// +// The query plan mirrors the real cleanup logic exactly so a dry-run +// followed by a real run sees the same counts (modulo writes that land +// between the two calls). +func (c *Crudr) DryRunRetention(ctx context.Context, cfg RetentionConfig) (DryRunPlan, error) { + plan := DryRunPlan{DormantTables: map[string]int64{}} + + // Dormant-cleanup preview. + if cfg.DormantCleanupEnabled { + threshold := cfg.DormantThreshold + if threshold < minDormantThreshold { + threshold = minDormantThreshold + } + cutoff := time.Now().Add(-threshold) + cutoffULID, err := ulidAtTime(cutoff) + if err != nil { + return plan, fmt.Errorf("compute dormant cutoff: %w", err) + } + plan.DormantCutoffULID = cutoffULID + + c.mu.Lock() + tables := make([]string, 0, len(c.typeMap)) + for t := range c.typeMap { + tables = append(tables, t) + } + c.mu.Unlock() + + for _, table := range tables { + if err := ctx.Err(); err != nil { + return plan, err + } + var maxULID sql.NullString + if err := c.DB.WithContext(ctx). + Raw(`SELECT MAX(ulid) FROM ops WHERE "table" = ?`, table). + Scan(&maxULID).Error; err != nil { + return plan, fmt.Errorf("query max ulid for %s: %w", table, err) + } + if !maxULID.Valid || maxULID.String == "" || maxULID.String >= cutoffULID { + continue + } + // This table would be cleaned. Compute exact row count + // and heap bytes. + var count int64 + var bytes sql.NullInt64 + if err := c.DB.WithContext(ctx). + Raw(`SELECT COUNT(*), COALESCE(SUM(pg_column_size(ops.*)), 0) + FROM ops WHERE "table" = ? AND ulid < ?`, + table, cutoffULID). + Row().Scan(&count, &bytes); err != nil { + return plan, fmt.Errorf("dryrun dormant count for %s: %w", table, err) + } + if count > 0 { + plan.DormantTables[table] = count + plan.DormantBytes += bytes.Int64 + } + } + } + + // Retention-sweep preview. We mirror computeRetentionCutoff but do + // not execute any DELETE. + if cfg.RetentionDays > 0 { + cutoff, reason, err := c.computeRetentionCutoff(ctx, cfg) + if err != nil { + return plan, fmt.Errorf("compute retention cutoff: %w", err) + } + if cutoff == "" { + plan.RetentionSkipReason = reason + } else { + plan.RetentionCutoffULID = cutoff + var rows int64 + var bytes sql.NullInt64 + if err := c.DB.WithContext(ctx). + Raw(`SELECT COUNT(*), COALESCE(SUM(pg_column_size(ops.*)), 0) + FROM ops WHERE ulid < ?`, cutoff). + Row().Scan(&rows, &bytes); err != nil { + return plan, fmt.Errorf("dryrun retention count: %w", err) + } + plan.RetentionRows = rows + plan.RetentionBytes = bytes.Int64 + } + } + + return plan, nil +} + +// RunRetention runs the ongoing retention sweep loop until ctx is cancelled. +// It is a no-op when cfg.RetentionDays <= 0. The first sweep runs after one +// SweepInterval; this gives the lifecycle's other initialization (RegisterModels, +// peer cursor backfill) time to settle. +func (c *Crudr) RunRetention(ctx context.Context, cfg RetentionConfig) error { + if cfg.RetentionDays <= 0 { + c.logger.Info("ops retention disabled (set OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS to enable)") + <-ctx.Done() + return ctx.Err() + } + if cfg.SweepInterval <= 0 { + return errors.New("retention sweep interval must be positive") + } + c.logger.Info("ops retention enabled", + zap.Int("retention_days", cfg.RetentionDays), + zap.Duration("sweep_interval", cfg.SweepInterval), + zap.Int("batch_limit", cfg.SweepBatchLimit), + zap.Duration("cursor_safety_margin", cfg.CursorSafetyMargin)) + ticker := time.NewTicker(cfg.SweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := c.retentionTick(ctx, cfg); err != nil { + c.logger.Warn("retention sweep tick failed", zap.Error(err)) + } + } + } +} + +// maxBatchesPerTick caps how many delete batches a single retention tick +// will run. The product of maxBatchesPerTick * SweepBatchLimit is the +// upper bound on rows removed per tick — keeps each tick's wall-clock and +// WAL pressure predictable even on a node with a multi-million-row +// backlog after enabling retention. At the default 1h interval, 10 +// batches × 10k = up to 100k rows/tick × 24 ticks/day = 2.4M rows/day, +// which exceeds the observed worst-case 1.1M ops/day write rate. +const maxBatchesPerTick = 10 + +// retentionTick runs a bounded retention sweep. It computes the cutoff +// once per tick, applies the cursor-floor invariant, then loops up to +// maxBatchesPerTick batches so a tick on a backlogged node makes +// measurable progress without monopolizing the DB. +func (c *Crudr) retentionTick(ctx context.Context, cfg RetentionConfig) error { + cutoff, reason, err := c.computeRetentionCutoff(ctx, cfg) + if err != nil { + return err + } + if cutoff == "" { + retention.RetentionSweepsSkipped.Add(1) + c.logger.Info("retention sweep skipped", zap.String("reason", reason)) + return nil + } + + batch := cfg.SweepBatchLimit + if batch <= 0 { + batch = 10000 + } + + var totalDeleted int64 + for i := 0; i < maxBatchesPerTick; i++ { + if err := ctx.Err(); err != nil { + return err + } + // Use a MATERIALIZED CTE so Postgres evaluates the SELECT once, + // honoring the LIMIT, and feeds the resulting ulid set to the + // DELETE. Without MATERIALIZED, Postgres 12+ may inline the CTE + // and the LIMIT semantics under a "DELETE ... IN (subquery)" + // shape become harder to reason about; we want the bounded-batch + // guarantee to hold regardless of planner version. + res := c.DB.WithContext(ctx).Exec(` + WITH victims AS MATERIALIZED ( + SELECT ulid + FROM ops + WHERE ulid < ? + ORDER BY ulid ASC + LIMIT ? + ) + DELETE FROM ops WHERE ulid IN (SELECT ulid FROM victims) + `, cutoff, batch) + if res.Error != nil { + return fmt.Errorf("retention delete: %w", res.Error) + } + totalDeleted += res.RowsAffected + if res.RowsAffected < int64(batch) { + break + } + } + + if totalDeleted > 0 { + retention.RetentionOpsDeleted.Add(uint64(totalDeleted)) + c.logger.Info("retention sweep deleted ops", + zap.Int64("rows_deleted", totalDeleted), + zap.String("cutoff_ulid", cutoff)) + } else { + retention.RetentionSweepsSkipped.Add(1) + c.logger.Debug("retention sweep: no eligible rows", + zap.String("cutoff_ulid", cutoff)) + } + return nil +} + +// computeRetentionCutoff returns the ULID below which ops are eligible for +// deletion under the given policy. Returns ("", reason, nil) when no rows +// are eligible: +// +// - any peer cursor is NULL or empty (a peer that has not advanced is +// treated as the most conservative possible cursor), +// - the smallest non-empty cursor is older than the age cutoff (a peer +// is more behind than the retention window allows; keep all ops). +// +// The cutoff itself is min(age_cutoff, cursor_floor_with_margin): +// whichever bound is tighter wins. The cursor floor is load-bearing +// (Topic 7 / cursor-floor invariant): no op younger than the slowest +// reachable peer's cursor may be deleted. +func (c *Crudr) computeRetentionCutoff(ctx context.Context, cfg RetentionConfig) (string, string, error) { + ageCutoff := time.Now().Add(-time.Duration(cfg.RetentionDays) * 24 * time.Hour) + ageCutoffULID, err := ulidAtTime(ageCutoff) + if err != nil { + return "", "compute age cutoff", err + } + + // Snapshot the active peer set under c.mu. The cursors table is + // shared with non-peer workers (qm_fix_truncated writes a CID under + // host='qm_fix_truncated'; decommissioned peers leave behind their + // rows). Without this filter, any such row would pin the retention + // floor indefinitely: ulid.Parse fails on a CID and the existing + // empty-cursor sentinel would still fire if we treated it as + // "malformed", or the floor would lex-sort to a nonsense value. + c.mu.Lock() + activePeers := make(map[string]bool, len(c.peerClients)) + for _, p := range c.peerClients { + activePeers[p.Host] = true + } + c.mu.Unlock() + + var cursors []Cursor + if err := c.DB.WithContext(ctx).Find(&cursors).Error; err != nil { + return "", "load cursors", fmt.Errorf("load cursors: %w", err) + } + + // Walk peer cursors only. Self-cursor is skipped. Empty / malformed / + // implausible-timestamp cursors on an active peer collapse into the + // empty-peer sentinel (most-conservative blocking). A missing cursor + // row for an active peer is treated the same. + floor := "" + emptyPeer := "" + seenPeers := map[string]bool{} + for _, cur := range cursors { + if cur.Host == c.host { + continue + } + if !activePeers[cur.Host] { + continue + } + seenPeers[cur.Host] = true + if cur.LastULID == "" { + if emptyPeer == "" || cur.Host < emptyPeer { + emptyPeer = cur.Host + } + continue + } + if _, perr := ulid.Parse(cur.LastULID); perr != nil || !isValidPeerSuppliedULID(cur.LastULID) { + c.logger.Warn("malformed or implausible peer cursor; treating as empty (most conservative)", + zap.String("peer", cur.Host), + zap.String("last_ulid", cur.LastULID)) + if emptyPeer == "" || cur.Host < emptyPeer { + emptyPeer = cur.Host + } + continue + } + if floor == "" || cur.LastULID < floor { + floor = cur.LastULID + } + } + for host := range activePeers { + if !seenPeers[host] { + if emptyPeer == "" || host < emptyPeer { + emptyPeer = host + } + } + } + if emptyPeer != "" { + return "", fmt.Sprintf("empty or missing cursor for active peer %s blocks deletion", emptyPeer), nil + } + + floorWithMargin := floor + if floor != "" && cfg.CursorSafetyMargin > 0 { + floorWithMargin, err = ulidShiftBack(floor, cfg.CursorSafetyMargin) + if err != nil { + return "", "apply cursor safety margin", err + } + } + + cutoff := ageCutoffULID + if floor != "" && floorWithMargin < cutoff { + cutoff = floorWithMargin + } + + // Fast-path: skip the delete if nothing is eligible. This is the + // common case once retention has caught up — most ticks are no-ops. + minULID, err := c.MinAvailableULID(ctx) + if err != nil { + return "", "load min ulid", err + } + if minULID == "" { + return "", "ops table empty", nil + } + if minULID >= cutoff { + return "", fmt.Sprintf("no rows older than cutoff (min=%s, cutoff=%s)", minULID, cutoff), nil + } + return cutoff, "", nil +} + +// ulidAtTime returns the smallest ULID with a timestamp >= t. We use all-zero +// entropy so the returned ULID is the lexicographic lower bound for the +// given millisecond — i.e. any op with timestamp < t has a ULID < the +// returned value. +func ulidAtTime(t time.Time) (string, error) { + id, err := ulid.New(ulid.Timestamp(t), bytes.NewReader(make([]byte, 16))) + if err != nil { + return "", err + } + return id.String(), nil +} + +// ulidShiftBack returns a ULID whose timestamp is d earlier than the +// timestamp encoded in srcULID. Entropy bytes are zeroed so the shifted +// ULID is the lower bound of its millisecond. +func ulidShiftBack(srcULID string, d time.Duration) (string, error) { + id, err := ulid.Parse(srcULID) + if err != nil { + return "", fmt.Errorf("parse ulid %q: %w", srcULID, err) + } + t := ulid.Time(id.Time()).Add(-d) + return ulidAtTime(t) +} + +// ServeCrudSweep returns the ops and the retention metadata that the sweep +// HTTP handler should advertise. When the caller's after parameter falls +// below this node's min(ulid), the second return is a non-empty string +// containing the smallest ULID currently available locally; callers should +// advertise this on the response (e.g. via the X-Retention-Gap-Min-Ulid +// header) so the requesting peer can advance its cursor explicitly +// rather than silently skipping the gap. +// +// This helper exists so the server-side handler and tests can share the +// gap-detection logic. The handler still applies its own gossip filter on +// the returned op slice. +func (c *Crudr) ServeCrudSweep(ctx context.Context, after string, limit int) (ops []*Op, gapMinULID string, err error) { + // MIN(ulid) and the bounded op slice MUST observe the same snapshot + // of `ops` so a retention DELETE that commits between the two reads + // cannot leave us serving a row the gap header doesn't cover. + // sql.TxOptions.Isolation emits the level at BEGIN so we don't + // depend on a separate SET TRANSACTION statement ordering. + err = c.DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var minOut sql.NullString + if err := tx.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&minOut).Error; err != nil { + return fmt.Errorf("load min ulid: %w", err) + } + minULID := "" + if minOut.Valid { + minULID = minOut.String + } + if after != "" && minULID != "" && after < minULID { + gapMinULID = minULID + } + return tx.Where("ulid > ?", after).Limit(limit).Order("ulid asc").Find(&ops).Error + }, &sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}) + if err != nil { + return nil, "", err + } + return ops, gapMinULID, nil +} + +// HTTP headers used by the sweep handler to advertise the retention-gap +// signal and the lowest locally-available ULID. Clients that don't +// understand these headers ignore them and proceed as before; clients +// that do understand them advance their cursor to gapMinULID instead of +// silently skipping the missing range. +const ( + HeaderRetentionGap = "X-Mediorum-Retention-Gap" + HeaderAvailableMin = "X-Mediorum-Available-Min-Ulid" +) + +// MarkSweepGapAdvance increments the gap-advance counter. Exposed for the +// sweep client; tests can read retention.SweepGapAdvances directly. +func MarkSweepGapAdvance() { retention.SweepGapAdvances.Add(1) } + +// gapULIDClockSkewWindow is the maximum forward time skew we'll accept on +// a peer-supplied ulid. A peer whose ulid decodes to a time more than +// this far in the future is treated as misconfigured or hostile. +// 30 minutes is wide enough to cover legitimate cross-host clock drift +// while still preventing a forged far-future ulid from permanently +// silencing a sweep stream or pinning our cursor at an attacker-chosen +// position. +const gapULIDClockSkewWindow = 30 * time.Minute + +// gapULIDEarliestPlausibleTime is the lower bound for any peer-supplied +// ulid. ULIDs were introduced in 2016; an epoch-zero or pre-2016 ulid +// from a peer is corruption or hostile crafting. Without this bound, +// ulidShiftBack on a near-epoch floor could underflow and permanently +// stall retention. +var gapULIDEarliestPlausibleTime = time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) + +// isValidPeerSuppliedULID returns true when the candidate ulid can be +// safely accepted from a peer. Applied at: +// - ApplyOp (rejects implausible op.ULID before persisting) +// - the gap-header receive path +// - the cursor LOAD path in doSweep (a persisted implausible cursor +// would otherwise echo back into the next sweep request) +// - the cursor walk in computeRetentionCutoff (alongside ulid.Parse; +// implausible-but-parseable rows are treated as missing so a hostile +// or pre-PR bad row cannot poison the floor) +// +// Without this guard, a hostile peer could either jump our cursor to a +// far-future ulid (silencing the sweep stream) or pin our retention +// floor at an epoch ulid (underflowing ulidShiftBack). +func isValidPeerSuppliedULID(candidate string) bool { + id, err := ulid.Parse(candidate) + if err != nil { + return false + } + t := ulid.Time(id.Time()) + if t.Before(gapULIDEarliestPlausibleTime) { + return false + } + return t.Before(time.Now().Add(gapULIDClockSkewWindow)) +} diff --git a/pkg/mediorum/crudr/retention_test.go b/pkg/mediorum/crudr/retention_test.go new file mode 100644 index 00000000..1478dd01 --- /dev/null +++ b/pkg/mediorum/crudr/retention_test.go @@ -0,0 +1,1208 @@ +package crudr + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/OpenAudio/go-openaudio/pkg/lifecycle" + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// retentionTestModel is a minimal registered CRUD model used by the +// retention tests. We give it a primary key so OnConflict-style apply +// paths work without panicking and so it mirrors the real registered +// models (Upload, QmAudioAnalysis, etc). +type retentionTestModel struct { + Key string `gorm:"primaryKey"` +} + +// otherRetentionTestModel mirrors the dormant-table scenario: another +// table the crud layer knows about, ops to which we will or won't +// emit depending on the test. +type otherRetentionTestModel struct { + Key string `gorm:"primaryKey"` +} + +// newRetentionCrudr returns a fresh crudr wired to the shared test DB, +// with the retention test models registered and the ops/cursors tables +// truncated. Tests that mutate env vars must restore them via t.Cleanup. +// The underlying *sql.DB is closed via t.Cleanup so a `-count=N` run does +// not exhaust Postgres connections. +func newRetentionCrudr(t *testing.T, host string) *Crudr { + t.Helper() + db := SetupTestDB() + t.Cleanup(func() { + if sqlDB, err := db.DB(); err == nil { + sqlDB.Close() + } + }) + + // migrate the retention test models so ApplyOp works + require.NoError(t, db.AutoMigrate(&retentionTestModel{}, &otherRetentionTestModel{})) + + // fully reset state between tests in the same package run + require.NoError(t, db.Exec(`TRUNCATE ops`).Error) + require.NoError(t, db.Exec(`TRUNCATE cursors`).Error) + require.NoError(t, db.Exec(`TRUNCATE retention_test_models`).Error) + require.NoError(t, db.Exec(`TRUNCATE other_retention_test_models`).Error) + + z := zap.NewNop() + c := New(host, nil, nil, db, lifecycle.NewLifecycle(context.Background(), "retention test", z), z, nil) + c.RegisterModels(&retentionTestModel{}, &otherRetentionTestModel{}) + return c +} + +// registerTestPeer attaches a PeerClient for the given host to the +// crudr's active-peer set, so the retention floor logic recognizes +// cursors with that Host as belonging to an active peer. The client +// is not Start()'d so no actual sweep traffic is generated. +func registerTestPeer(c *Crudr, host string) { + c.mu.Lock() + defer c.mu.Unlock() + c.peerClients = append(c.peerClients, NewPeerClient(host, c, c.host)) +} + +// resetRetentionStats clears the package counters so tests don't +// pollute each other. +func resetRetentionStats() { + retention.DormantTablesCleaned.Store(0) + retention.DormantOpsDeleted.Store(0) + retention.RetentionOpsDeleted.Store(0) + retention.RetentionSweepsSkipped.Store(0) + retention.SweepGapAdvances.Store(0) +} + +// setupRetentionTestDB opens a freshly-truncated test DB and registers a +// t.Cleanup to close it, so a long `-count=N` run never exhausts +// Postgres connections. +func setupRetentionTestDB(t *testing.T) *gorm.DB { + t.Helper() + db := SetupTestDB() + t.Cleanup(func() { + if sqlDB, err := db.DB(); err == nil { + sqlDB.Close() + } + }) + require.NoError(t, db.AutoMigrate(&retentionTestModel{})) + require.NoError(t, db.Exec(`TRUNCATE ops`).Error) + require.NoError(t, db.Exec(`TRUNCATE cursors`).Error) + return db +} + +// insertOpAt inserts an op with a ULID derived from the given wall time +// and a deterministic per-call entropy stream so callers with overlapping +// timestamps still get distinct ULIDs. Returns the inserted ULID string. +func insertOpAt(t *testing.T, c *Crudr, table string, at time.Time) string { + t.Helper() + return insertOpAtWithEntropy(t, c, table, at, ulid.Make().Entropy()) +} + +func insertOpAtWithEntropy(t *testing.T, c *Crudr, table string, at time.Time, entropy []byte) string { + t.Helper() + id, err := ulid.New(ulid.Timestamp(at), bytes.NewReader(entropy)) + require.NoError(t, err) + op := &Op{ + ULID: id.String(), + Host: c.host, + Action: ActionCreate, + Table: table, + Data: json.RawMessage(`[]`), + } + require.NoError(t, c.DB.Create(op).Error) + return id.String() +} + +func countOpsForTable(t *testing.T, c *Crudr, table string) int64 { + t.Helper() + var n int64 + require.NoError(t, c.DB.Raw(`SELECT COUNT(*) FROM ops WHERE "table" = ?`, table).Scan(&n).Error) + return n +} + +func TestLoadRetentionConfig_Defaults(t *testing.T) { + // Snapshot and restore env so we don't leak into sibling tests. + for _, k := range []string{ + "OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", + "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN", + } { + prev, had := os.LookupEnv(k) + os.Unsetenv(k) + t.Cleanup(func() { + if had { + os.Setenv(k, prev) + } + }) + } + cfg := LoadRetentionConfig() + assert.True(t, cfg.DormantCleanupEnabled, "dormant cleanup must default ON") + assert.Equal(t, 90*24*time.Hour, cfg.DormantThreshold) + assert.Equal(t, 0, cfg.RetentionDays, "ongoing retention must default OFF") + assert.Equal(t, 1*time.Hour, cfg.SweepInterval) + assert.Equal(t, 10000, cfg.SweepBatchLimit) + assert.Equal(t, 1*time.Hour, cfg.CursorSafetyMargin) +} + +func TestLoadRetentionConfig_OptOutDormant(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", "true") + cfg := LoadRetentionConfig() + assert.False(t, cfg.DormantCleanupEnabled) +} + +// Component 1 — dormant-table cleanup tests. + +func TestCleanupDormantOps_DropsDormantTable(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // 100 ops on the dormant table, all 200 days old. + for i := 0; i < 100; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // 5 ops on a still-active table, all within the last hour. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "other_retention_test_models", now.Add(-time.Duration(i)*time.Minute)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + assert.Equal(t, int64(100), deleted["retention_test_models"]) + assert.NotContains(t, deleted, "other_retention_test_models", "active table must not be cleaned") + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(5), countOpsForTable(t, c, "other_retention_test_models")) + assert.Equal(t, uint64(1), retention.DormantTablesCleaned.Load()) + assert.Equal(t, uint64(100), retention.DormantOpsDeleted.Load()) +} + +func TestCleanupDormantOps_OptOutPreservesAll(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: false, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted) + assert.Equal(t, int64(25), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(0), retention.DormantTablesCleaned.Load()) +} + +func TestCleanupDormantOps_Idempotent(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 7; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-180*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted1, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(7), deleted1["retention_test_models"]) + + deleted2, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted2, "second run on an already-cleaned table must be a no-op") +} + +func TestCleanupDormantOps_RecentWriteOnDormantTableBlocksDelete(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // 50 old ops... + for i := 0; i < 50; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // ...and one recent op on the same table. + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute)) + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "a single recent op must protect all ops for the table") + assert.Equal(t, int64(51), countOpsForTable(t, c, "retention_test_models")) +} + +func TestCleanupDormantOps_AfterCleanupMinUlidReflectsRemaining(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour)) + keepUlid := insertOpAt(t, c, "other_retention_test_models", now.Add(-1*time.Hour)) + + _, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + minULID, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.Equal(t, keepUlid, minULID, "min ulid should be the surviving op") +} + +func TestCleanupDormantOps_UnregisteredTableUntouched(t *testing.T) { + // A table whose model the crud layer does NOT know about must not + // be classified as dormant just because it has no recent ops; it + // must not appear in c.typeMap at all. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // Pretend the ops table has rows for a long-removed table. + for i := 0; i < 10; i++ { + insertOpAt(t, c, "ghost_table", now.Add(-365*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "unregistered tables must not be touched by the dormant cleanup") + assert.Equal(t, int64(10), countOpsForTable(t, c, "ghost_table")) +} + +func TestCleanupDormantOps_RespectsContextCancel(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.CleanupDormantOps(ctx, cfg) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) +} + +// Component 2 — gap-signal tests. + +func TestServeCrudSweep_BelowMinReturnsGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 3; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(time.Duration(i)*time.Millisecond)) + } + + veryOld, err := ulid.New(ulid.Timestamp(now.Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + + ops, gap, err := c.ServeCrudSweep(context.Background(), veryOld.String(), 100) + require.NoError(t, err) + assert.NotEmpty(t, gap, "after below min must signal gap") + assert.Len(t, ops, 3, "returned ops are the full set above the caller's cursor") +} + +func TestServeCrudSweep_AtOrAboveMinNoGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + first := insertOpAt(t, c, "retention_test_models", now) + insertOpAt(t, c, "retention_test_models", now.Add(1*time.Millisecond)) + + // passing exactly the lowest ULID means "give me ops > min": no gap. + ops, gap, err := c.ServeCrudSweep(context.Background(), first, 100) + require.NoError(t, err) + assert.Empty(t, gap) + assert.Len(t, ops, 1, "expect the single op strictly greater than the cursor") +} + +func TestServeCrudSweep_EmptyAfterIsNotAGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + insertOpAt(t, c, "retention_test_models", time.Now()) + + ops, gap, err := c.ServeCrudSweep(context.Background(), "", 100) + require.NoError(t, err) + assert.Empty(t, gap, "first-time sweep (empty cursor) is not a retention gap") + assert.Len(t, ops, 1) +} + +func TestServeCrudSweep_EmptyOpsTableNoGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + // caller has a cursor but the local ops table is empty (e.g. fresh + // node). No gap, no ops. + ops, gap, err := c.ServeCrudSweep(context.Background(), "01HABCD000000000000000000A", 100) + require.NoError(t, err) + assert.Empty(t, gap) + assert.Empty(t, ops) +} + +// Dry-run preview tests. + +func TestDryRunRetention_DormantOnly(t *testing.T) { + // Default config: dormant cleanup ON, retention sweep OFF. Dry run + // reports the dormant table count and skips the retention preview. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 50; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // Active table — must not appear in the plan. + insertOpAt(t, c, "other_retention_test_models", now.Add(-1*time.Minute)) + + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(50), plan.DormantTables["retention_test_models"]) + assert.NotContains(t, plan.DormantTables, "other_retention_test_models") + assert.Greater(t, plan.DormantBytes, int64(0), "dormant bytes must be reported") + assert.Equal(t, int64(0), plan.RetentionRows, "retention preview skipped when RetentionDays=0") + assert.NotEmpty(t, plan.DormantCutoffULID) + // Dry run must not delete anything. + assert.Equal(t, int64(50), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(1), countOpsForTable(t, c, "other_retention_test_models")) +} + +func TestDryRunRetention_RetentionPreviewSkipsOnEmptyCursor(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "empty-peer", LastULID: ""}).Error) + registerTestPeer(c, "empty-peer") + + cfg := RetentionConfig{DormantCleanupEnabled: false, RetentionDays: 30, SweepBatchLimit: 100, CursorSafetyMargin: 1 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(0), plan.RetentionRows) + assert.NotEmpty(t, plan.RetentionSkipReason, "empty cursor must produce a skip reason") + assert.Equal(t, int64(10), countOpsForTable(t, c, "retention_test_models")) +} + +func TestDryRunRetention_RetentionPreviewWithEligibleRows(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + // 30 old ops + 5 recent ops. + for i := 0; i < 30; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{DormantCleanupEnabled: false, RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(30), plan.RetentionRows, "old ops counted, recent ops preserved") + assert.Greater(t, plan.RetentionBytes, int64(0)) + assert.Empty(t, plan.RetentionSkipReason) + assert.NotEmpty(t, plan.RetentionCutoffULID) + // No DELETE executed. + assert.Equal(t, int64(35), countOpsForTable(t, c, "retention_test_models")) +} + +func TestDryRunRetention_DormantCleanupDisabledNoPreview(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{DormantCleanupEnabled: false, DormantThreshold: 30 * 24 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, plan.DormantTables) + assert.Equal(t, int64(0), plan.DormantBytes) +} + +// Component 3 — retention sweep tests. + +func TestRunRetention_DisabledIsNoOp(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-365*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{RetentionDays: 0} + + // Disabled retention should block on ctx; we cancel quickly to exit + // the loop without deleting anything. + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + err := c.RunRetention(ctx, cfg) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Equal(t, int64(20), countOpsForTable(t, c, "retention_test_models")) +} + +func TestRetentionTick_AllCursorsAheadDeletesOldOps(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Old ops (> 30d) and recent ops (< 1h). + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + // Every peer cursor is at "now" — no peer is behind. + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-a", LastULID: recentULID.String()}).Error) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-b", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Old ops gone, recent ops kept. + assert.Equal(t, int64(5), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(25), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_CursorBelowCutoffPreventsDelete(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Three age cohorts of ops to verify cursor pinning: + // - 60d old: older than the slow cursor (45d) => deletable. + // - 40d old: newer than the slow cursor => MUST be kept + // (this is the load-bearing case). + // - 1m old: well within the cursor => kept. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-40*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + stuckCursor, err := ulid.New(ulid.Timestamp(now.Add(-45*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "slow-peer", LastULID: stuckCursor.String()}).Error) + registerTestPeer(c, "slow-peer") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Only the 60d cohort should be deleted; the 40d cohort sits above + // the slow cursor and MUST be preserved, otherwise that peer's next + // sweep would silently skip a deleted range. + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(10), remaining, + "slow peer must block deletion of ops newer than its cursor") + assert.Equal(t, uint64(5), retention.RetentionOpsDeleted.Load()) + + // No remaining op may be older than (cursor - margin). + floorWithMargin, err := ulidShiftBack(stuckCursor.String(), 1*time.Hour) + require.NoError(t, err) + var oldestRemaining string + require.NoError(t, c.DB.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&oldestRemaining).Error) + assert.GreaterOrEqual(t, oldestRemaining, floorWithMargin, + "no remaining op may be older than the cursor-safety floor") +} + +// TestRetentionTick_NonPeerCursorRowsIgnored proves that a cursor row +// written by a non-peer worker (e.g., qm_fix_truncated stores a CID +// under the same `cursors` table) does NOT pin retention. Without the +// activePeers filter in computeRetentionCutoff, that CID would parse +// as a malformed/zero ULID and pin the floor at the epoch, silently +// suppressing all deletions across the fleet. +func TestRetentionTick_NonPeerCursorRowsIgnored(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + // qm_fix_truncated writes a CID into LastULID under its own host name. + // CIDs are not ULIDs and the host is not a registered peer. Both the + // host filter and the ulid plausibility check must reject this row. + require.NoError(t, c.DB.Create(&Cursor{ + Host: "qm_fix_truncated", + LastULID: "QmTZxYzAbCd1234567890abcdef1234567890abcdef12", + }).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(5), remaining, + "non-peer cursor row must not pin retention; the 60d cohort should delete") + assert.Equal(t, uint64(5), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_EmptyCursorBlocksDeletion(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 12; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-empty", LastULID: ""}).Error) + registerTestPeer(c, "peer-empty") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(12), countOpsForTable(t, c, "retention_test_models"), + "an empty cursor row must be treated as the most conservative possible cursor") + assert.Equal(t, uint64(0), retention.RetentionOpsDeleted.Load()) + assert.GreaterOrEqual(t, retention.RetentionSweepsSkipped.Load(), uint64(1)) +} + +func TestRetentionTick_SafetyMarginHonored(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Three op ages spread around a slow cursor at -30d: + // older than (cursor - 2h), within (cursor - 2h, cursor), and newer than cursor. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-50*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-30*24*time.Hour-30*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-29*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + stuckCursor, err := ulid.New(ulid.Timestamp(now.Add(-30*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "slow", LastULID: stuckCursor.String()}).Error) + registerTestPeer(c, "slow") + + // 1h margin pushes cutoff to (cursor - 1h). Anything between cursor-1h + // and cursor must be preserved. + cfg := RetentionConfig{RetentionDays: 1, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Only the very-old (-50d) bucket can be deleted; the 30m-before-cursor + // bucket is inside the safety margin and the 29d bucket sits above the + // cursor itself. + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(10), remaining, + "safety margin must keep the 30-min-before-cursor bucket alive") +} + +func TestRetentionTick_BatchLimitHonored(t *testing.T) { + // Drains the eligible set with batch=100, looping internally up to + // maxBatchesPerTick. 250 eligible rows complete in 3 batches and + // the loop exits on the short final batch. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 250; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 100, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(250), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_DrainsUpToMaxBatchesPerTick(t *testing.T) { + // A backlogged tick must take more than one batch in a single call, + // up to maxBatchesPerTick. With batch=10 and maxBatches=10, a tick + // against a 200-row eligible set should delete 100 rows (10 batches + // × 10 rows), leaving 100 untouched until the next tick. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 200; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 10, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + remaining := countOpsForTable(t, c, "retention_test_models") + // We deleted at most maxBatchesPerTick * batch = 100 rows. + expectedRemaining := int64(200) - int64(maxBatchesPerTick*10) + assert.Equal(t, expectedRemaining, remaining, + "tick must drain up to maxBatchesPerTick batches; backlog finishes on subsequent ticks") + assert.Equal(t, uint64(maxBatchesPerTick*10), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_IteratesAllRegisteredTables(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + insertOpAt(t, c, "other_retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // retention deletes by ulid alone (not per-table), so all 10 ops are + // dropped in this single tick — this is the documented behavior of + // the first PR: uniform cutoff across all CRUD tables. + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(0), countOpsForTable(t, c, "other_retention_test_models")) +} + +func TestRetentionTick_SelfCursorIgnored(t *testing.T) { + // A cursor row whose host equals the crudr's selfHost must not + // block deletion: it's not a remote peer, it's the node's own + // (rare/test-only) self-pointer. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "host1", LastULID: ""}).Error) + + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models"), + "self-cursor must not block deletion") +} + +func TestRetentionTick_NoPeersUsesAgeCutoffOnly(t *testing.T) { + // Devnet-style single-node: no peer cursors. The age cutoff is the + // only gate. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 8; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) +} + +// Client-side gap-signal tests. + +// roundTripFunc lets a test stand in for the peer HTTP server. +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +func TestPeerClient_DetectsGapAndAdvancesCursor(t *testing.T) { + resetRetentionStats() + db := setupRetentionTestDB(t) + + // peer advertises a gap with min ulid in the recent past + peerMinULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-7*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + + // our cursor is far below the peer's min + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-x", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + assert.Equal(t, oldCursorULID.String(), r.URL.Query().Get("after")) + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, peerMinULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer-x", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + // Cursor must be advanced to the peer's advertised floor. + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://peer-x").First(&cur).Error) + assert.Equal(t, peerMinULID.String(), cur.LastULID, + "cursor must explicitly advance to peer's advertised retention floor") + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load(), + "sweep client must increment the gap counter for operator visibility") +} + +func TestPeerClient_GapHeaderRespectedAcrossTicks(t *testing.T) { + // On a steady-state retention scenario, the same cursor should not + // re-advance over the same gap on the next tick — the persisted + // cursor now equals or exceeds gapMinULID, so the + // `gapMinULID > lastUlid` guard suppresses the second increment. + resetRetentionStats() + db := setupRetentionTestDB(t) + + peerMinULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-7*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-z", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, peerMinULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + peer := NewPeerClient("http://peer-z", c, "http://self") + + // First tick: counter increments once. + require.NoError(t, peer.doSweep(context.Background())) + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load()) + + // Second tick: cursor is now == peerMinULID, gap signal still present + // but suppressed by the strict-greater-than guard. + require.NoError(t, peer.doSweep(context.Background())) + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load(), + "counter must not double-count when cursor already at floor") +} + +func TestPeerClient_HostileFarFutureGapULIDRejected(t *testing.T) { + // A hostile or misconfigured peer that advertises a gap ulid far in + // the future must NOT silence our sweep stream. The client must + // reject the gap header, keep its cursor where it was, and continue + // applying any ops in the response body normally. + resetRetentionStats() + db := setupRetentionTestDB(t) + + // A ulid 5 years in the future — well beyond any legitimate clock + // skew window. + futureULID, err := ulid.New(ulid.Timestamp(time.Now().Add(5*365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://hostile-peer", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, futureULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://hostile-peer", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + // Cursor must NOT have advanced — the hostile gap was rejected. + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://hostile-peer").First(&cur).Error) + assert.Equal(t, oldCursorULID.String(), cur.LastULID, + "cursor must not advance across a far-future advertised gap ulid") + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load(), + "hostile gap must not increment the operator metric") +} + +func TestPeerClient_MalformedGapULIDRejected(t *testing.T) { + // A non-ULID string in the gap header is also a rejection case. + resetRetentionStats() + db := setupRetentionTestDB(t) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, "not-a-ulid-this-is-garbage") + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://peer").First(&cur).Error) + assert.Equal(t, oldCursorULID.String(), cur.LastULID) + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load()) +} + +func TestIsValidGapULID(t *testing.T) { + now := time.Now() + + mk := func(at time.Time) string { + id, err := ulid.New(ulid.Timestamp(at), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + return id.String() + } + + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(-30*24*time.Hour))), "past ulid valid") + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(-1*time.Second))), "recent past ulid valid") + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(15*time.Minute))), "small forward skew valid") + assert.False(t, isValidPeerSuppliedULID(mk(now.Add(2*time.Hour))), "2h forward beyond skew window rejected") + assert.False(t, isValidPeerSuppliedULID(mk(now.Add(365*24*time.Hour))), "1y forward rejected") + assert.False(t, isValidPeerSuppliedULID(""), "empty string rejected") + assert.False(t, isValidPeerSuppliedULID("not a ulid"), "garbage rejected") +} + +func TestPeerClient_NoGapHeaderNoAdvance(t *testing.T) { + resetRetentionStats() + db := setupRetentionTestDB(t) + + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-y", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer-y", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load(), + "no gap header => no counter increment") +} + +// MinAvailableULID surface test. + +func TestMinAvailableULID_EmptyTable(t *testing.T) { + c := newRetentionCrudr(t, "host1") + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.Empty(t, min) +} + +func TestMinAvailableULID_ReturnsSmallest(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + insertOpAt(t, c, "retention_test_models", now.Add(-2*time.Hour)) + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Hour)) + insertOpAt(t, c, "retention_test_models", now) + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.NotEmpty(t, min) + + // The smallest ulid is the one we inserted at -2h. + var explicitMin string + require.NoError(t, c.DB.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&explicitMin).Error) + assert.Equal(t, explicitMin, min) +} + +// Operator-config safety tests. + +func TestCleanupDormantOps_BelowMinThresholdClamped(t *testing.T) { + // An operator who sets the threshold below the safety floor (24h) + // must not be able to delete brand-new tables: the clamp keeps a + // brief lull from being classified as dormant. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 1 * time.Minute} + + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-2*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "threshold < 24h must clamp up; -2h ops must not be classified dormant") + assert.Equal(t, int64(10), countOpsForTable(t, c, "retention_test_models")) +} + +func TestCleanupDormantOps_PreservesNewOpsAboveCutoff(t *testing.T) { + // Race-window guard: a producer writes a new op for a table the + // cleanup has just classified as dormant. The new op's ULID is above + // the cutoff, so the bounded WHERE clause must spare it even though + // the table appears dormant from the MAX(ulid) check. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // Old ops, far below the cutoff. + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // Inject a NEW op *above* the cutoff after the dormancy classification + // but before cleanup gets to its DELETE. We simulate the race by + // inserting before the call: same effect because the dormancy check + // uses MAX(ulid) which still returns one of the old ulids if we put + // the recent one in *after* picking maxULID. But the cleanup's + // `WHERE ulid < cutoff` guard means recent ops are never touched + // regardless of MAX ordering — the property we're verifying here. + freshULID := insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute)) + + // The dormancy check will see freshULID as the max and classify the + // table as still-active. To exercise the race-guard explicitly, + // re-run with a threshold tight enough that the table is dormant + // EXCEPT for the fresh op (12h between fresh op and cutoff < 24h + // floor, so we use 23h base then test the WHERE clause directly). + // + // Simpler probe: call CleanupDormantOps and assert freshULID survives + // even if the table is "borderline dormant". Both paths converge on + // the same guarantee: ulids above cutoff never get deleted. + _, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + // freshULID must still be present. + var found int64 + require.NoError(t, c.DB.Raw(`SELECT COUNT(*) FROM ops WHERE ulid = ?`, freshULID).Scan(&found).Error) + assert.Equal(t, int64(1), found, "ops above the cutoff must never be deleted by dormant cleanup") +} + +func TestCleanupDormantOps_BatchedDeletionLargeTable(t *testing.T) { + // Exercise the batched path: insert more rows than dormantBatchSize + // (10k) to confirm multiple iterations complete and totalForTable + // is correct. + if testing.Short() { + t.Skip("large insert test: skipped under -short") + } + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now().Add(-200 * 24 * time.Hour) + // 15001 rows -> 2 full batches + 1 short batch + rows := 15001 + bulk := make([]Op, 0, rows) + for i := 0; i < rows; i++ { + entropy := make([]byte, 10) + entropy[0] = byte(i & 0xff) + entropy[1] = byte((i >> 8) & 0xff) + id, err := ulid.New(ulid.Timestamp(now.Add(time.Duration(i)*time.Millisecond)), bytes.NewReader(entropy)) + require.NoError(t, err) + bulk = append(bulk, Op{ + ULID: id.String(), + Host: "host1", + Action: ActionCreate, + Table: "retention_test_models", + Data: json.RawMessage(`[]`), + }) + } + // chunk inserts so the test stays reasonable + for start := 0; start < len(bulk); start += 5000 { + end := start + 5000 + if end > len(bulk) { + end = len(bulk) + } + require.NoError(t, c.DB.Create(bulk[start:end]).Error) + } + require.Equal(t, int64(rows), countOpsForTable(t, c, "retention_test_models")) + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(rows), deleted["retention_test_models"]) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.GreaterOrEqual(t, retention.DormantOpsDeleted.Load(), uint64(rows)) +} + +// End-to-end gap signal: real HTTP server with the actual handler, real +// client. Catches wiring bugs between serve_crud.go and client.go. + +func TestSweep_EndToEnd_GapHeaderPropagatesAndClientAdvances(t *testing.T) { + resetRetentionStats() + // Single shared DB (test harness uses one), so the server-side Crudr + // and the client-side PeerClient both operate against the same ops + // table. The handler reads from c.ServeCrudSweep; the client reads + // the response. Truncate once, then never wipe again — both sides + // share state. + c := newRetentionCrudr(t, "self-host") + + // Seed three recent ops as the "peer" side. + now := time.Now() + for i := 0; i < 3; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(time.Duration(i)*time.Millisecond)) + } + + // Build a minimal HTTP server that mirrors what serve_crud.go does: + // call ServeCrudSweep and forward the gap headers verbatim. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + after := r.URL.Query().Get("after") + ops, gap, err := c.ServeCrudSweep(r.Context(), after, 100) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + if gap != "" { + w.Header().Set(HeaderRetentionGap, "true") + w.Header().Set(HeaderAvailableMin, gap) + } + w.Header().Set("Content-Type", "application/json") + if ops == nil { + ops = []*Op{} + } + buf, _ := json.Marshal(ops) + w.Write(buf) + })) + defer server.Close() + + // Plant a cursor that's a year behind the server's ops. This is the + // pre-gap state of a peer that was offline through the retention + // window. + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: server.URL, LastULID: oldCursorULID.String()}).Error) + + // The PeerClient runs the same production code path; we just inject + // the httptest client transport so the gap headers travel over the + // wire path the real sweep uses. + c.httpClient = server.Client() + peer := NewPeerClient(server.URL, c, "self-host") + require.NoError(t, peer.doSweep(context.Background())) + + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", server.URL).First(&cur).Error) + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.GreaterOrEqual(t, cur.LastULID, min, "client cursor must be at or above peer's min") + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load()) +} + +// Stale-cursor pinning behavior: an ancient peer cursor must pin the +// retention floor even when the age cutoff would otherwise delete those ops. +// Demonstrates the documented limitation that stale cursors block retention. +func TestRetentionTick_AncientCursorPinsAllRetention(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // All ops > retention window (200d). + for i := 0; i < 12; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + // A long-decommissioned peer with a cursor from 3 years ago. + ancientULID, err := ulid.New(ulid.Timestamp(now.Add(-3*365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "decommissioned-peer", LastULID: ancientULID.String()}).Error) + registerTestPeer(c, "decommissioned-peer") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // All ops survive: the ancient cursor pins the floor at -3y, far + // below the oldest op we have, so the cutoff is below our min(ulid) + // and nothing is eligible. + assert.Equal(t, int64(12), countOpsForTable(t, c, "retention_test_models"), + "ancient cursor must pin the retention floor") + assert.Equal(t, uint64(0), retention.RetentionOpsDeleted.Load()) +} + +// Concurrent sweep + retention: prove Postgres MVCC keeps both safe. +func TestRetention_ConcurrentSweepAndDeleteSafety(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 200; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 50, CursorSafetyMargin: 1 * time.Hour} + + // Fire a retention tick and a sweep query concurrently; assert both + // complete without error and that the sweep returns a coherent + // snapshot. + doneSweep := make(chan int, 1) + doneRetention := make(chan error, 1) + go func() { + ops, _, err := c.ServeCrudSweep(context.Background(), "", 1000) + if err != nil { + doneSweep <- -1 + return + } + doneSweep <- len(ops) + }() + go func() { + doneRetention <- c.retentionTick(context.Background(), cfg) + }() + + require.NoError(t, <-doneRetention) + got := <-doneSweep + assert.GreaterOrEqual(t, got, 0, "concurrent sweep must complete without error") +} diff --git a/pkg/mediorum/server/serve_crud.go b/pkg/mediorum/server/serve_crud.go index f09a1b98..4d6a8fe7 100644 --- a/pkg/mediorum/server/serve_crud.go +++ b/pkg/mediorum/server/serve_crud.go @@ -25,14 +25,7 @@ func (ss *MediorumServer) serveCrudSweep(c echo.Context) error { defer cancel() after := c.QueryParam("after") - var ops []*crudr.Op - err := ss.crud.DB. - WithContext(ctx). - Where("ulid > ?", after). - Limit(PullLimit). - Order("ulid asc"). - Find(&ops). - Error + ops, gapMinULID, err := ss.crud.ServeCrudSweep(ctx, after, PullLimit) if err != nil { return c.String(500, fmt.Sprintf("Failed to query ops: %v", err)) } @@ -52,6 +45,16 @@ func (ss *MediorumServer) serveCrudSweep(c echo.Context) error { filteredOps = append(filteredOps, op) } + // Retention-gap signal: when the caller's cursor falls below our + // retention floor, expose the lowest available ULID so the client + // can advance its cursor explicitly rather than silently skip the + // gap. The body remains a normal ops array for wire compatibility; + // older clients ignore these headers and behave as before. + if gapMinULID != "" { + c.Response().Header().Set(crudr.HeaderRetentionGap, "true") + c.Response().Header().Set(crudr.HeaderAvailableMin, gapMinULID) + } + c.Response().Header().Set(echo.HeaderCacheControl, "public, max-age=300") return c.JSON(200, filteredOps) } diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index 40be802c..c8ce637c 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -152,6 +152,7 @@ type MediorumServer struct { Config MediorumConfig crudSweepMutex sync.Mutex + retentionCfg crudr.RetentionConfig // handle communication between core and mediorum for Proof of Storage posChannel chan pos.PoSRequest @@ -322,12 +323,19 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos Timeout: 3 * time.Minute, // covers blob replication and pull } - // crud + // crud. Normalize self-host with the same rules crudr.New applies to + // peer hosts (lowercase + trailing-slash strip) so a trailing-slash + // or mixed-case mismatch between config.Self.Host and the chain + // registry cannot land self in peerHosts. Without this normalization + // a future config-builder drift would let self be flagged as an + // active peer with no cursor row, permanently blocking retention via + // computeRetentionCutoff's empty-peer sentinel. + normalizedSelf := audiusHttputil.RemoveTrailingSlash(strings.ToLower(config.Self.Host)) peerHosts := []string{} allHosts := []string{} for _, peer := range config.Peers { allHosts = append(allHosts, peer.Host) - if peer.Host != config.Self.Host { + if audiusHttputil.RemoveTrailingSlash(strings.ToLower(peer.Host)) != normalizedSelf { peerHosts = append(peerHosts, peer.Host) } } @@ -335,6 +343,10 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos crud := crudr.New(config.Self.Host, config.privateKey, peerHosts, db, mediorumLifecycle, logger, peerHTTPClient) dbMigrate(crud, config.Self.Host) + // Load retention config once so the dormant cleanup and the + // ongoing retention sweep see a stable view of env vars. + retentionCfg := crudr.LoadRetentionConfig() + deadHosts := config.DeadHosts if deadHosts == nil { deadHosts = []string{} @@ -391,6 +403,7 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos transcodeWork: make(chan *Upload, 100), replicationWork: make(chan *Upload, 100), posChannel: posChannel, + retentionCfg: retentionCfg, peerHealths: map[string]*PeerHealth{}, redirectCache: imcache.New(imcache.WithMaxEntriesLimitOption[string, string](50_000, imcache.EvictionPolicyLRU)), @@ -615,6 +628,66 @@ func (ss *MediorumServer) MustStart() error { if ss.Config.WalletIsRegistered { ss.crud.StartClients() + // Composite-index build runs OFF the boot path. On val001-scale + // (84 GiB heap, ~250M rows) CREATE INDEX CONCURRENTLY can take + // 30-60 min; we cannot block /health-check that long. A pinned + // *sql.Conn carries the session-scoped advisory lock + the + // invalid-leftover probe + the create through one backend + // session. Without the pin, the lock leaks across the gorm + // pool and a partial CONCURRENTLY build leaves an INVALID + // index that IF NOT EXISTS short-circuits on, silently + // downgrading retention to seq-scans. + ss.lc.AddManagedRoutine("crudr ops index ensure", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("ops index ensure panicked; recovered", zap.Any("panic", r)) + } + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + } + if err := crudr.EnsureOpsTableIndex(ctx, ss.crud.DB); err != nil { + ss.logger.Warn("ops index ensure failed; retention paths will seq-scan until next process restart", zap.Error(err)) + } + <-ctx.Done() + return ctx.Err() + }) + + // Dormant-table cleanup off the boot path so /health-check is + // reachable while a possibly multi-million-row backlog drains. + // The defer-recover protects the entire mediorum process from + // a panic in CleanupDormantOps (lifecycle.AddManagedRoutine has + // no built-in recover). + ss.lc.AddManagedRoutine("crudr dormant cleanup", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("dormant cleanup panicked; recovered", zap.Any("panic", r)) + } + }() + passCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + if _, err := ss.crud.CleanupDormantOps(passCtx, ss.retentionCfg); err != nil { + ss.logger.Warn("dormant cleanup finished with error", zap.Error(err)) + } + <-ctx.Done() + return ctx.Err() + }) + + // Ongoing retention sweep: opt-in via OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS. + // When unset, RunRetention blocks on ctx.Done() and never deletes. + // Uses the same RetentionConfig captured at server New() time so the + // dormant cleanup and the ongoing sweep share one snapshot of env. + ss.lc.AddManagedRoutine("crudr ops retention", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("ops retention panicked; recovered", zap.Any("panic", r)) + } + }() + return ss.crud.RunRetention(ctx, ss.retentionCfg) + }) + ss.lc.AddManagedRoutine("health poller", ss.startHealthPoller) ss.lc.AddManagedRoutine("repairer", ss.startRepairer) ss.lc.AddManagedRoutine("qm syncer", ss.startQmSyncer) From b59635823846b9006cd84a8109ad27da509a046c Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 21 May 2026 13:40:03 +0900 Subject: [PATCH 2/3] fix(mediorum): gate ops table index build --- pkg/mediorum/crudr/retention.go | 6 +++ pkg/mediorum/crudr/retention_test.go | 8 ++++ pkg/mediorum/server/server.go | 65 +++++++++++++++------------- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/pkg/mediorum/crudr/retention.go b/pkg/mediorum/crudr/retention.go index bdfafb15..8a7bf86e 100644 --- a/pkg/mediorum/crudr/retention.go +++ b/pkg/mediorum/crudr/retention.go @@ -48,6 +48,11 @@ type RetentionConfig struct { // computing the cutoff. Gives the slowest reachable peer time to catch // up between sweeps. CursorSafetyMargin time.Duration + // EnsureOpsTableIndex controls the optional composite index build for + // table-scoped dormant cleanup. It defaults off because building the + // index on production-scale ops tables can require multiple GiB of + // free disk during rollout. + EnsureOpsTableIndex bool } // LoadRetentionConfig reads the retention configuration from environment @@ -60,6 +65,7 @@ func LoadRetentionConfig() RetentionConfig { SweepInterval: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL"), SweepBatchLimit: env.GetInt(10000, "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT"), CursorSafetyMargin: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN"), + EnsureOpsTableIndex: env.Bool("OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX"), } return cfg } diff --git a/pkg/mediorum/crudr/retention_test.go b/pkg/mediorum/crudr/retention_test.go index 1478dd01..788b6e26 100644 --- a/pkg/mediorum/crudr/retention_test.go +++ b/pkg/mediorum/crudr/retention_test.go @@ -139,6 +139,7 @@ func TestLoadRetentionConfig_Defaults(t *testing.T) { "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL", "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT", "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN", + "OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX", } { prev, had := os.LookupEnv(k) os.Unsetenv(k) @@ -155,6 +156,7 @@ func TestLoadRetentionConfig_Defaults(t *testing.T) { assert.Equal(t, 1*time.Hour, cfg.SweepInterval) assert.Equal(t, 10000, cfg.SweepBatchLimit) assert.Equal(t, 1*time.Hour, cfg.CursorSafetyMargin) + assert.False(t, cfg.EnsureOpsTableIndex, "production-scale index build must be opt-in") } func TestLoadRetentionConfig_OptOutDormant(t *testing.T) { @@ -163,6 +165,12 @@ func TestLoadRetentionConfig_OptOutDormant(t *testing.T) { assert.False(t, cfg.DormantCleanupEnabled) } +func TestLoadRetentionConfig_OptInOpsTableIndex(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX", "true") + cfg := LoadRetentionConfig() + assert.True(t, cfg.EnsureOpsTableIndex) +} + // Component 1 — dormant-table cleanup tests. func TestCleanupDormantOps_DropsDormantTable(t *testing.T) { diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index c8ce637c..170b839c 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -78,12 +78,12 @@ type MediorumConfig struct { DiscoveryListensEndpoints []string LogLevel string DeadHosts []string - RepairEnabled bool `default:"true"` - RepairInterval time.Duration `default:"1h"` - RepairConcurrency int `default:"1"` + RepairEnabled bool `default:"true"` + RepairInterval time.Duration `default:"1h"` + RepairConcurrency int `default:"1"` ProgrammableDistributionEnabled bool - BlobStorageStreaming bool + BlobStorageStreaming bool // should have a basedir type of thing // by default will put db + blobs there @@ -91,7 +91,6 @@ type MediorumConfig struct { privateKey *ecdsa.PrivateKey } - type MediorumServer struct { lc *lifecycle.Lifecycle echo *echo.Echo @@ -415,8 +414,8 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos bgPullBackoff: imcache.New(imcache.WithMaxEntriesLimitOption[string, struct{}](50_000, imcache.EvictionPolicyLRU), imcache.WithDefaultExpirationOption[string, struct{}](time.Hour)), replicationAttempts: imcache.New(imcache.WithMaxEntriesLimitOption[string, struct{}](50_000, imcache.EvictionPolicyLRU), imcache.WithDefaultExpirationOption[string, struct{}](time.Hour)), - StartedAt: time.Now().UTC(), - Config: config, + StartedAt: time.Now().UTC(), + Config: config, geoIPdbReady: make(chan struct{}), core: core, @@ -628,32 +627,36 @@ func (ss *MediorumServer) MustStart() error { if ss.Config.WalletIsRegistered { ss.crud.StartClients() - // Composite-index build runs OFF the boot path. On val001-scale - // (84 GiB heap, ~250M rows) CREATE INDEX CONCURRENTLY can take - // 30-60 min; we cannot block /health-check that long. A pinned - // *sql.Conn carries the session-scoped advisory lock + the - // invalid-leftover probe + the create through one backend - // session. Without the pin, the lock leaks across the gorm - // pool and a partial CONCURRENTLY build leaves an INVALID - // index that IF NOT EXISTS short-circuits on, silently - // downgrading retention to seq-scans. - ss.lc.AddManagedRoutine("crudr ops index ensure", func(ctx context.Context) error { - defer func() { - if r := recover(); r != nil { - ss.logger.Error("ops index ensure panicked; recovered", zap.Any("panic", r)) + if ss.retentionCfg.EnsureOpsTableIndex { + // Composite-index build runs OFF the boot path. On val001-scale + // (84 GiB heap, ~250M rows) CREATE INDEX CONCURRENTLY can take + // 30-60 min; we cannot block /health-check that long. A pinned + // *sql.Conn carries the session-scoped advisory lock + the + // invalid-leftover probe + the create through one backend + // session. Without the pin, the lock leaks across the gorm + // pool and a partial CONCURRENTLY build leaves an INVALID + // index that IF NOT EXISTS short-circuits on, silently + // downgrading retention to seq-scans. + ss.lc.AddManagedRoutine("crudr ops index ensure", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("ops index ensure panicked; recovered", zap.Any("panic", r)) + } + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): } - }() - select { - case <-ctx.Done(): + if err := crudr.EnsureOpsTableIndex(ctx, ss.crud.DB); err != nil { + ss.logger.Warn("ops index ensure failed; retention paths will seq-scan until next process restart", zap.Error(err)) + } + <-ctx.Done() return ctx.Err() - case <-time.After(10 * time.Second): - } - if err := crudr.EnsureOpsTableIndex(ctx, ss.crud.DB); err != nil { - ss.logger.Warn("ops index ensure failed; retention paths will seq-scan until next process restart", zap.Error(err)) - } - <-ctx.Done() - return ctx.Err() - }) + }) + } else { + ss.logger.Info("ops table index ensure disabled (set OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX=true to enable)") + } // Dormant-table cleanup off the boot path so /health-check is // reachable while a possibly multi-million-row backlog drains. From 35e3045957a482bd06c07d7f160e95e6c2496950 Mon Sep 17 00:00:00 2001 From: Rolf Hoefer Date: Thu, 21 May 2026 16:16:59 +0900 Subject: [PATCH 3/3] fix(mediorum): make dormant ops cleanup opt-in --- pkg/mediorum/crudr/retention.go | 21 +++++++++++---------- pkg/mediorum/crudr/retention_test.go | 12 ++++++++++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/mediorum/crudr/retention.go b/pkg/mediorum/crudr/retention.go index 8a7bf86e..cf76fa41 100644 --- a/pkg/mediorum/crudr/retention.go +++ b/pkg/mediorum/crudr/retention.go @@ -20,9 +20,10 @@ import ( // LoadRetentionConfig, with safe defaults. // // Lifecycle: -// - DormantCleanupEnabled defaults to true. The cleanup runs once per -// process start; re-running is a no-op once the table has been cleaned. -// Set OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS=true to opt out. +// - DormantCleanupEnabled defaults to false. The cleanup runs once per +// process start when OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP=true; +// re-running is a no-op once the table has been cleaned. Set +// OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS=true as an explicit kill switch. // - RetentionDays==0 disables the ongoing retention sweep (archive mode, // current default behavior). Setting OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS // to a positive integer enables the sweep. @@ -59,7 +60,7 @@ type RetentionConfig struct { // variables. All fields use OPENAUDIO_ canonical names. func LoadRetentionConfig() RetentionConfig { cfg := RetentionConfig{ - DormantCleanupEnabled: !env.Bool("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS"), + DormantCleanupEnabled: env.Bool("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP") && !env.Bool("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS"), DormantThreshold: env.GetDuration(90*24*time.Hour, "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD"), RetentionDays: env.GetInt(0, "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS"), SweepInterval: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL"), @@ -159,11 +160,11 @@ const dormantBatchSize = 10000 // to cover this case — keep such tables out of CRUDR or out-of-band. // // Future-maintainer note: any new CRUD table added via RegisterModels must -// expect to be wiped here if it sees no writes for cfg.DormantThreshold -// (default 90d). A legitimate low-write-cadence table (e.g. a quarterly -// metric) belongs outside the CRUD layer or behind a non-default -// threshold; the dormancy default is calibrated for upload/audio-analysis- -// rate write streams. +// expect to be wiped here when the operator enables dormant cleanup and it +// sees no writes for cfg.DormantThreshold (default 90d). A legitimate +// low-write-cadence table (e.g. a quarterly metric) belongs outside the CRUD +// layer or behind a non-default threshold; the dormancy threshold is calibrated +// for upload/audio-analysis-rate write streams. // // Deletes are batched so a multi-million-row dormant table does not hold // locks or WAL for one giant transaction. Each batch is its own statement @@ -180,7 +181,7 @@ const dormantBatchSize = 10000 func (c *Crudr) CleanupDormantOps(ctx context.Context, cfg RetentionConfig) (map[string]int64, error) { deleted := map[string]int64{} if !cfg.DormantCleanupEnabled { - c.logger.Info("dormant ops cleanup disabled by OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS") + c.logger.Info("dormant ops cleanup disabled (set OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP=true to enable)") return deleted, nil } diff --git a/pkg/mediorum/crudr/retention_test.go b/pkg/mediorum/crudr/retention_test.go index 788b6e26..5cfec365 100644 --- a/pkg/mediorum/crudr/retention_test.go +++ b/pkg/mediorum/crudr/retention_test.go @@ -133,6 +133,7 @@ func countOpsForTable(t *testing.T, c *Crudr, table string) int64 { func TestLoadRetentionConfig_Defaults(t *testing.T) { // Snapshot and restore env so we don't leak into sibling tests. for _, k := range []string{ + "OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", "OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD", "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS", @@ -150,7 +151,7 @@ func TestLoadRetentionConfig_Defaults(t *testing.T) { }) } cfg := LoadRetentionConfig() - assert.True(t, cfg.DormantCleanupEnabled, "dormant cleanup must default ON") + assert.False(t, cfg.DormantCleanupEnabled, "production-scale dormant cleanup must be opt-in") assert.Equal(t, 90*24*time.Hour, cfg.DormantThreshold) assert.Equal(t, 0, cfg.RetentionDays, "ongoing retention must default OFF") assert.Equal(t, 1*time.Hour, cfg.SweepInterval) @@ -159,7 +160,14 @@ func TestLoadRetentionConfig_Defaults(t *testing.T) { assert.False(t, cfg.EnsureOpsTableIndex, "production-scale index build must be opt-in") } -func TestLoadRetentionConfig_OptOutDormant(t *testing.T) { +func TestLoadRetentionConfig_OptInDormant(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", "true") + cfg := LoadRetentionConfig() + assert.True(t, cfg.DormantCleanupEnabled) +} + +func TestLoadRetentionConfig_DormantKillSwitchWins(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", "true") t.Setenv("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", "true") cfg := LoadRetentionConfig() assert.False(t, cfg.DormantCleanupEnabled)