diff --git a/pkg/mediorum/crudr/client.go b/pkg/mediorum/crudr/client.go index 2438082f..aec601e7 100644 --- a/pkg/mediorum/crudr/client.go +++ b/pkg/mediorum/crudr/client.go @@ -129,7 +129,11 @@ func (p *PeerClient) doSweep(ctx context.Context) error { host := p.Host bulkEndpoint := "/internal/crud/sweep" // hardcoded - // get cursor + // get cursor. Treat an implausible persisted LastULID (e.g., a + // pre-PR row with a far-future or epoch ULID) as missing. Without + // this guard, doSweep would echo the bad value as `after=` and + // re-upsert the same bad value at end-of-sweep, locking the peer + // into a permanent prune-recreate cycle that no other path heals. lastUlid := "" { var cursor Cursor @@ -138,6 +142,10 @@ func (p *PeerClient) doSweep(ctx context.Context) error { if !errors.Is(err, gorm.ErrRecordNotFound) { p.logger.Warn("failed to get cursor", "err", err) } + } else if cursor.LastULID != "" && !isValidPeerSuppliedULID(cursor.LastULID) { + p.logger.Warn("persisted cursor LastULID is implausible; treating as missing", + "peer", host, + "last_ulid", cursor.LastULID) } else { lastUlid = cursor.LastULID } @@ -164,6 +172,42 @@ func (p *PeerClient) doSweep(ctx context.Context) error { return fmt.Errorf("bad status: %d", resp.StatusCode) } + // Retention-gap signal: the peer has dropped ops below our cursor and + // is advertising the lowest ulid it still has. Stage a cursor advance + // across the gap. Without this branch, the response body would be + // applied normally and the cursor would silently skip the gap. + // + // First-contact guard: if we have no durable cursor (`lastUlid == ""`), + // the empty-string < anyULID comparison would let a peer establish + // our initial cursor at an attacker-chosen position. Require an + // existing cursor before honoring the gap. + // + // Defer-until-decode: stage the advance in lastUlid here but do NOT + // persist. The end-of-sweep upsert is the single persistence point. + // On body-decode failure or apply error we abort without writing the + // forward cursor, so the next sweep retries the same range cleanly. + gapMinULID := resp.Header.Get(HeaderAvailableMin) + gapAdvancePending := false + if resp.Header.Get(HeaderRetentionGap) == "true" && gapMinULID != "" { + switch { + case lastUlid == "": + p.logger.Warn("ignoring retention-gap header on first contact (no durable cursor yet)", + "peer", host, + "advertised_ulid", gapMinULID) + case !isValidPeerSuppliedULID(gapMinULID): + p.logger.Warn("ignoring retention-gap header with invalid ulid", + "peer", host, + "advertised_ulid", gapMinULID) + case gapMinULID > lastUlid: + p.logger.Warn("retention gap detected: peer cursor below peer's available history; staging cursor advance across gap", + "peer", host, + "local_cursor", lastUlid, + "peer_available_min_ulid", gapMinULID) + lastUlid = gapMinULID + gapAdvancePending = true + } + } + var ops []*Op dec := json.NewDecoder(resp.Body) err = dec.Decode(&ops) @@ -197,12 +241,21 @@ func (p *PeerClient) doSweep(ctx context.Context) error { p.Seeded = true } - // set cursor - { + // set cursor. Single persistence point for both normal advance and + // gap-staged advance. MarkSweepGapAdvance fires only after the + // upsert succeeds AND lastUlid is still at-or-above the staged gap + // floor. Body ops that regress lastUlid below the gap suppress + // the counter so it never over-reports relative to durable state. + // Skip the upsert when lastUlid stays empty so a first-contact + // rejection or no-ops response doesn't write an empty cursor row + // that would later block all retention via the empty-peer sentinel. + if lastUlid != "" { upsertClause := clause.OnConflict{UpdateAll: true} err := p.crudr.DB.Clauses(upsertClause).Create(&Cursor{Host: host, LastULID: lastUlid}).Error if err != nil { p.logger.Error("failed to set cursor", "err", err) + } else if gapAdvancePending && lastUlid >= gapMinULID { + MarkSweepGapAdvance() } } diff --git a/pkg/mediorum/crudr/crudr.go b/pkg/mediorum/crudr/crudr.go index c962dba4..c4ce5917 100644 --- a/pkg/mediorum/crudr/crudr.go +++ b/pkg/mediorum/crudr/crudr.go @@ -10,6 +10,7 @@ import ( "reflect" "strings" "sync" + "time" "github.com/OpenAudio/go-openaudio/pkg/httputil" "github.com/OpenAudio/go-openaudio/pkg/lifecycle" @@ -20,6 +21,80 @@ import ( "gorm.io/gorm/clause" ) +// opsTableIndexLockKey is the pg session-scoped advisory-lock key for +// the composite ops("table", ulid) index migration. ASCII-packed +// "ops_indx" so a future maintainer adding adjacent locks can spot +// the convention. +const opsTableIndexLockKey int64 = 0x6F70735F696E6478 + +// EnsureOpsTableIndex creates the composite index ops("table", ulid) +// used by the dormant cleanup and (when enabled) the per-table +// retention queries. Designed to run OFF the boot path via a managed +// routine so the multi-hour CONCURRENTLY build on val001-scale tables +// cannot block /health-check. +// +// The advisory lock is acquired on a pinned *sql.Conn so all five +// steps (acquire, probe-invalid, optional DROP, CREATE, unlock) share +// one Postgres backend session. Without the pin, gorm's pool checks +// out a different conn per call and the session-scoped lock leaks +// off the original session. Two processes could each "acquire" the +// lock on different pool conns and race the CONCURRENTLY build. +// +// The invalid-leftover probe + DROP handles the recovery case where +// a prior process died mid-CONCURRENTLY; without it, the next boot's +// CREATE IF NOT EXISTS short-circuits on the INVALID index name and +// retention paths silently degrade to seq-scans forever. +func EnsureOpsTableIndex(ctx context.Context, db *gorm.DB) error { + sqlDB, err := db.DB() + if err != nil { + return fmt.Errorf("ops-index ensure: get underlying *sql.DB: %w", err) + } + conn, err := sqlDB.Conn(ctx) + if err != nil { + return fmt.Errorf("ops-index ensure: pin connection: %w", err) + } + defer conn.Close() + + var locked bool + if err := conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, opsTableIndexLockKey).Scan(&locked); err != nil { + return fmt.Errorf("ops-index ensure: acquire advisory lock: %w", err) + } + if !locked { + return nil + } + defer func() { + // Detached short-context: a shutdown signal during the build + // shouldn't strand the lock. conn.Close also releases + // session-scoped locks as belt-and-suspenders. + unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, _ = conn.ExecContext(unlockCtx, `SELECT pg_advisory_unlock($1)`, opsTableIndexLockKey) + }() + + var invalid bool + if err := conn.QueryRowContext(ctx, ` + SELECT EXISTS ( + SELECT 1 + FROM pg_index i + JOIN pg_class c ON c.oid = i.indexrelid + WHERE c.relname = 'idx_ops_table_ulid' + AND NOT i.indisvalid + ) + `).Scan(&invalid); err != nil { + return fmt.Errorf("ops-index ensure: probe invalid index: %w", err) + } + if invalid { + if _, err := conn.ExecContext(ctx, `DROP INDEX CONCURRENTLY IF EXISTS idx_ops_table_ulid`); err != nil { + return fmt.Errorf("ops-index ensure: drop invalid index: %w", err) + } + } + + if _, err := conn.ExecContext(ctx, `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ops_table_ulid ON ops ("table", ulid)`); err != nil { + return fmt.Errorf("ops-index ensure: create index: %w", err) + } + return nil +} + const ( ActionCreate = "create" ActionUpdate = "update" @@ -257,6 +332,17 @@ func (c *Crudr) KnownType(op *Op) bool { } func (c *Crudr) ApplyOp(op *Op) error { + // Reject implausible ULIDs from any source (peer push, sweep response, + // or local code). Local code constructs via ulid.Make() which always + // passes; the check is essentially free on the local path. A hostile + // peer can otherwise spoof op.Host = c.host to bypass any host-keyed + // gate AND plant a poisoned ULID (epoch, far-future, malformed) that + // would persist into cursors and break ulidShiftBack on the retention + // floor probe. + if !isValidPeerSuppliedULID(op.ULID) { + return fmt.Errorf("rejecting op with implausible ULID %q from origin %s", op.ULID, op.Host) + } + elemType, ok := c.typeMap[op.Table] if !ok { return fmt.Errorf("no type registered for %s", op.Table) diff --git a/pkg/mediorum/crudr/retention.go b/pkg/mediorum/crudr/retention.go new file mode 100644 index 00000000..cf76fa41 --- /dev/null +++ b/pkg/mediorum/crudr/retention.go @@ -0,0 +1,737 @@ +package crudr + +import ( + "bytes" + "context" + "database/sql" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/OpenAudio/go-openaudio/pkg/env" + "github.com/oklog/ulid/v2" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// RetentionConfig holds knobs for the ops retention sweep and the one-time +// dormant-table cleanup. Configuration is read from environment variables in +// LoadRetentionConfig, with safe defaults. +// +// Lifecycle: +// - DormantCleanupEnabled defaults to false. The cleanup runs once per +// process start when OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP=true; +// re-running is a no-op once the table has been cleaned. Set +// OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS=true as an explicit kill switch. +// - RetentionDays==0 disables the ongoing retention sweep (archive mode, +// current default behavior). Setting OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS +// to a positive integer enables the sweep. +type RetentionConfig struct { + // DormantCleanupEnabled controls Component 1 (one-time dormant-table cleanup). + DormantCleanupEnabled bool + // DormantThreshold is the minimum age of the newest op for a table to be + // considered dormant. The default mirrors the structurally-dormant signal + // observed in qm_audio_analyses (no producer writes since Nov 2025). + DormantThreshold time.Duration + + // RetentionDays controls Component 3 (ongoing retention sweep). Zero + // disables the sweep (archive mode). Positive values delete ops older + // than this many days, subject to the cursor floor. + RetentionDays int + // SweepInterval is the cadence of the ongoing retention sweep loop. + SweepInterval time.Duration + // SweepBatchLimit is the maximum number of ops deleted in a single + // DELETE statement. Keeps long-running transactions short and avoids + // blocking concurrent ops writes. + SweepBatchLimit int + // CursorSafetyMargin is subtracted from min(cursors.last_ulid) before + // computing the cutoff. Gives the slowest reachable peer time to catch + // up between sweeps. + CursorSafetyMargin time.Duration + // EnsureOpsTableIndex controls the optional composite index build for + // table-scoped dormant cleanup. It defaults off because building the + // index on production-scale ops tables can require multiple GiB of + // free disk during rollout. + EnsureOpsTableIndex bool +} + +// LoadRetentionConfig reads the retention configuration from environment +// variables. All fields use OPENAUDIO_ canonical names. +func LoadRetentionConfig() RetentionConfig { + cfg := RetentionConfig{ + DormantCleanupEnabled: env.Bool("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP") && !env.Bool("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS"), + DormantThreshold: env.GetDuration(90*24*time.Hour, "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD"), + RetentionDays: env.GetInt(0, "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS"), + SweepInterval: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL"), + SweepBatchLimit: env.GetInt(10000, "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT"), + CursorSafetyMargin: env.GetDuration(1*time.Hour, "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN"), + EnsureOpsTableIndex: env.Bool("OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX"), + } + return cfg +} + +// RetentionStats are atomic counters exposed for operator-visible metrics. +type RetentionStats struct { + // DormantTablesCleaned counts tables whose ops were dropped by the one-time + // dormant-table cleanup during this process lifetime. + DormantTablesCleaned atomic.Uint64 + // DormantOpsDeleted counts the total number of ops rows the dormant-table + // cleanup deleted during this process lifetime. + DormantOpsDeleted atomic.Uint64 + // RetentionOpsDeleted counts ops rows deleted by the ongoing retention + // sweep during this process lifetime. + RetentionOpsDeleted atomic.Uint64 + // RetentionSweepsSkipped counts sweep ticks where no rows were eligible + // for deletion (empty cursor blocked the cutoff, no rows older than + // MinAge, etc). + RetentionSweepsSkipped atomic.Uint64 + // SweepGapAdvances counts the number of times the local sweep client + // observed a retention-gap signal from a peer and explicitly advanced + // its cursor across the gap. This is the operator-visible metric + // for Topic-7 silent-skip detection. + SweepGapAdvances atomic.Uint64 +} + +// retention is the package-level set of stats, exposed for tests and +// operator metrics. Values reset only on process restart. +var retention RetentionStats + +// Stats returns a pointer to the package retention stats. The fields are +// atomic counters and may be read concurrently with the running sweep. +func Stats() *RetentionStats { return &retention } + +// MinAvailableULID returns the smallest ulid currently stored in the ops +// table. Returns ("", nil) when the ops table is empty. Callers use this +// to advertise the retention floor (see ServeCrudSweep gap signal) and to +// short-circuit retention work when the table is already empty. +func (c *Crudr) MinAvailableULID(ctx context.Context) (string, error) { + var out sql.NullString + err := c.DB.WithContext(ctx). + Raw(`SELECT MIN(ulid) FROM ops`). + Scan(&out).Error + if err != nil { + return "", err + } + if !out.Valid { + return "", nil + } + return out.String, nil +} + +// minDormantThreshold is the smallest dormancy window we will honor. An +// operator who sets OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD to a value +// below this is treated as misconfigured and the threshold is clamped up. +// 24h is a wide enough floor that a temporarily-quiet table is never +// misclassified as dormant during a brief lull. +const minDormantThreshold = 24 * time.Hour + +// dormantBatchSize bounds the per-statement work the dormant cleanup +// does. A single all-rows DELETE on a 50M-row dormant table holds locks +// and WAL for the entire delete; batching keeps each transaction small +// enough that concurrent op writes for OTHER tables aren't blocked by +// the cleanup, and an OOM-kill mid-cleanup rolls back at most one +// batch's worth of work. +const dormantBatchSize = 10000 + +// CleanupDormantOps drops ops rows for tables that have not received a write +// in cfg.DormantThreshold. It is idempotent: re-running is a no-op once a +// table has been cleaned (the newest remaining op is now from this process's +// write traffic, not the historical sediment). +// +// The set of registered tables comes from c.typeMap, so a table that is no +// longer registered by mediorum (e.g. removed in a future PR) cannot be +// misclassified as dormant by this function. We only delete ops for tables +// the caller has registered with RegisterModels. +// +// Gap-signal interaction. The ServeCrudSweep gap signal advertises +// MIN(ulid) across ALL tables in ops, not per-table. When this cleanup +// removes a fully-dormant table's ops but other tables still hold older +// ops (the val001 baseline: `uploads` rows back to 2023-03 outlast +// `qm_audio_analyses` ops that were emitted in 2024), the overall +// min(ulid) does not change. A peer whose cursor falls between the +// dormant table's oldest and newest ulids would, on next sweep, silently +// skip the deleted dormant-table ops because the gap signal only fires +// against the overall floor. This is acceptable specifically for dormant +// tables because, by construction, their producer code has been removed +// (no live consumer depends on the deleted ops). New maintainers who add +// a CRUD table that retains both an active producer and a low write +// cadence (e.g., quarterly metrics) should not rely on the gap signal +// to cover this case — keep such tables out of CRUDR or out-of-band. +// +// Future-maintainer note: any new CRUD table added via RegisterModels must +// expect to be wiped here when the operator enables dormant cleanup and it +// sees no writes for cfg.DormantThreshold (default 90d). A legitimate +// low-write-cadence table (e.g. a quarterly metric) belongs outside the CRUD +// layer or behind a non-default threshold; the dormancy threshold is calibrated +// for upload/audio-analysis-rate write streams. +// +// Deletes are batched so a multi-million-row dormant table does not hold +// locks or WAL for one giant transaction. Each batch is its own statement +// bounded by `ulid < cutoffULID`; that guard prevents the race where a +// producer writes a new op for the table after the dormancy check but +// before the batched DELETE catches up — the new op's ulid sits above the +// cutoff and is never touched. An interrupted cleanup leaves a well-formed +// table with fewer dormant rows, and the next run picks up where the +// previous left off. +// +// Returns the per-table deletion counts and a non-nil error only if a DB +// operation fails. A no-op clean (cfg disabled or no dormant tables) is a +// nil-error empty-map return. +func (c *Crudr) CleanupDormantOps(ctx context.Context, cfg RetentionConfig) (map[string]int64, error) { + deleted := map[string]int64{} + if !cfg.DormantCleanupEnabled { + c.logger.Info("dormant ops cleanup disabled (set OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP=true to enable)") + return deleted, nil + } + + threshold := cfg.DormantThreshold + if threshold < minDormantThreshold { + c.logger.Warn("dormant threshold below safety floor; clamping", + zap.Duration("configured", cfg.DormantThreshold), + zap.Duration("floor", minDormantThreshold)) + threshold = minDormantThreshold + } + + // Snapshot the currently registered table set under the mutex so a + // concurrent RegisterModels call cannot race with the scan below. + c.mu.Lock() + tables := make([]string, 0, len(c.typeMap)) + for t := range c.typeMap { + tables = append(tables, t) + } + c.mu.Unlock() + + cutoff := time.Now().Add(-threshold) + cutoffULID, err := ulidAtTime(cutoff) + if err != nil { + return deleted, fmt.Errorf("compute dormant cutoff: %w", err) + } + + for _, table := range tables { + if err := ctx.Err(); err != nil { + return deleted, err + } + + // Find the most recent op for this table. NULL => no ops at all, + // which means the table is either brand-new or has already been + // cleaned; either way, skip. + var maxULID sql.NullString + err := c.DB.WithContext(ctx). + Raw(`SELECT MAX(ulid) FROM ops WHERE "table" = ?`, table). + Scan(&maxULID).Error + if err != nil { + return deleted, fmt.Errorf("query max ulid for %s: %w", table, err) + } + if !maxULID.Valid || maxULID.String == "" { + c.logger.Debug("dormant cleanup: no ops for table", + zap.String("table", table)) + continue + } + + // Compare lex order against the cutoff. ULIDs sort + // chronologically, so newest_op < cutoff_ulid means the newest op + // is older than the cutoff, i.e. the table is dormant. + if maxULID.String >= cutoffULID { + c.logger.Debug("dormant cleanup: table still active", + zap.String("table", table), + zap.String("newest_op_ulid", maxULID.String), + zap.String("cutoff_ulid", cutoffULID)) + continue + } + + // Batched DELETE bounded by the dormancy cutoff. Each iteration + // takes one bounded chunk and commits, so a multi-million-row + // dormant table is cleaned in pieces. The `ulid < cutoffULID` + // guard is load-bearing: if a producer writes a new op for this + // table between the dormancy check above and a later batch (the + // "dormant table just became active mid-cleanup" race), that new + // op's ULID is above the cutoff and is preserved. Loop exits + // when a batch returns zero rows. + var totalForTable int64 + for { + if err := ctx.Err(); err != nil { + return deleted, err + } + res := c.DB.WithContext(ctx).Exec(` + WITH victims AS MATERIALIZED ( + SELECT ulid + FROM ops + WHERE "table" = ? AND ulid < ? + ORDER BY ulid ASC + LIMIT ? + ) + DELETE FROM ops WHERE ulid IN (SELECT ulid FROM victims) + `, table, cutoffULID, dormantBatchSize) + if res.Error != nil { + return deleted, fmt.Errorf("delete dormant ops for %s: %w", table, res.Error) + } + totalForTable += res.RowsAffected + if res.RowsAffected < dormantBatchSize { + break + } + } + if totalForTable > 0 { + deleted[table] = totalForTable + retention.DormantTablesCleaned.Add(1) + retention.DormantOpsDeleted.Add(uint64(totalForTable)) + c.logger.Info("dormant cleanup: dropped ops for table", + zap.String("table", table), + zap.Int64("ops_deleted", totalForTable), + zap.String("newest_op_ulid", maxULID.String), + zap.Duration("threshold", threshold)) + } + } + return deleted, nil +} + +// DryRunPlan describes what a real retention pass would do without +// executing any DELETE. It is the operator-facing preview surface for +// retention behavior. Counts are exact (computed against live ops state) +// but the on-disk size estimate is heap-only and excludes index/TOAST +// overhead; multiply by ~1.18 to approximate index+heap recovery for the +// `ops` relation on val001 (84 GiB total / 72 GiB heap). +type DryRunPlan struct { + // DormantTables maps table name -> rows the dormant cleanup would + // delete. Tables that are still active are omitted. Empty when the + // dormant cleanup is disabled. + DormantTables map[string]int64 + // DormantBytes is the sum of pg_column_size over the rows + // DormantTables would delete. Heap-only. + DormantBytes int64 + // RetentionRows is the count of rows the ongoing retention sweep + // would delete on its next tick. Zero when RetentionDays <= 0. + RetentionRows int64 + // RetentionBytes is the heap-only sum for those rows. + RetentionBytes int64 + // RetentionSkipReason is the reason the retention sweep would + // skip this tick (empty cursor, ops table empty, etc). Empty when + // the sweep would proceed. + RetentionSkipReason string + // RetentionCutoffULID is the cutoff the retention sweep would use. + // Empty when the sweep would skip. + RetentionCutoffULID string + // DormantCutoffULID is the cutoff the dormant cleanup would use. + DormantCutoffULID string +} + +// DryRunRetention computes what CleanupDormantOps and one retentionTick +// would do given cfg, without executing any DELETE. Use this from a +// debug endpoint, an `audius-ctl` subcommand, or operator scripting +// before flipping retention on. +// +// The query plan mirrors the real cleanup logic exactly so a dry-run +// followed by a real run sees the same counts (modulo writes that land +// between the two calls). +func (c *Crudr) DryRunRetention(ctx context.Context, cfg RetentionConfig) (DryRunPlan, error) { + plan := DryRunPlan{DormantTables: map[string]int64{}} + + // Dormant-cleanup preview. + if cfg.DormantCleanupEnabled { + threshold := cfg.DormantThreshold + if threshold < minDormantThreshold { + threshold = minDormantThreshold + } + cutoff := time.Now().Add(-threshold) + cutoffULID, err := ulidAtTime(cutoff) + if err != nil { + return plan, fmt.Errorf("compute dormant cutoff: %w", err) + } + plan.DormantCutoffULID = cutoffULID + + c.mu.Lock() + tables := make([]string, 0, len(c.typeMap)) + for t := range c.typeMap { + tables = append(tables, t) + } + c.mu.Unlock() + + for _, table := range tables { + if err := ctx.Err(); err != nil { + return plan, err + } + var maxULID sql.NullString + if err := c.DB.WithContext(ctx). + Raw(`SELECT MAX(ulid) FROM ops WHERE "table" = ?`, table). + Scan(&maxULID).Error; err != nil { + return plan, fmt.Errorf("query max ulid for %s: %w", table, err) + } + if !maxULID.Valid || maxULID.String == "" || maxULID.String >= cutoffULID { + continue + } + // This table would be cleaned. Compute exact row count + // and heap bytes. + var count int64 + var bytes sql.NullInt64 + if err := c.DB.WithContext(ctx). + Raw(`SELECT COUNT(*), COALESCE(SUM(pg_column_size(ops.*)), 0) + FROM ops WHERE "table" = ? AND ulid < ?`, + table, cutoffULID). + Row().Scan(&count, &bytes); err != nil { + return plan, fmt.Errorf("dryrun dormant count for %s: %w", table, err) + } + if count > 0 { + plan.DormantTables[table] = count + plan.DormantBytes += bytes.Int64 + } + } + } + + // Retention-sweep preview. We mirror computeRetentionCutoff but do + // not execute any DELETE. + if cfg.RetentionDays > 0 { + cutoff, reason, err := c.computeRetentionCutoff(ctx, cfg) + if err != nil { + return plan, fmt.Errorf("compute retention cutoff: %w", err) + } + if cutoff == "" { + plan.RetentionSkipReason = reason + } else { + plan.RetentionCutoffULID = cutoff + var rows int64 + var bytes sql.NullInt64 + if err := c.DB.WithContext(ctx). + Raw(`SELECT COUNT(*), COALESCE(SUM(pg_column_size(ops.*)), 0) + FROM ops WHERE ulid < ?`, cutoff). + Row().Scan(&rows, &bytes); err != nil { + return plan, fmt.Errorf("dryrun retention count: %w", err) + } + plan.RetentionRows = rows + plan.RetentionBytes = bytes.Int64 + } + } + + return plan, nil +} + +// RunRetention runs the ongoing retention sweep loop until ctx is cancelled. +// It is a no-op when cfg.RetentionDays <= 0. The first sweep runs after one +// SweepInterval; this gives the lifecycle's other initialization (RegisterModels, +// peer cursor backfill) time to settle. +func (c *Crudr) RunRetention(ctx context.Context, cfg RetentionConfig) error { + if cfg.RetentionDays <= 0 { + c.logger.Info("ops retention disabled (set OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS to enable)") + <-ctx.Done() + return ctx.Err() + } + if cfg.SweepInterval <= 0 { + return errors.New("retention sweep interval must be positive") + } + c.logger.Info("ops retention enabled", + zap.Int("retention_days", cfg.RetentionDays), + zap.Duration("sweep_interval", cfg.SweepInterval), + zap.Int("batch_limit", cfg.SweepBatchLimit), + zap.Duration("cursor_safety_margin", cfg.CursorSafetyMargin)) + ticker := time.NewTicker(cfg.SweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := c.retentionTick(ctx, cfg); err != nil { + c.logger.Warn("retention sweep tick failed", zap.Error(err)) + } + } + } +} + +// maxBatchesPerTick caps how many delete batches a single retention tick +// will run. The product of maxBatchesPerTick * SweepBatchLimit is the +// upper bound on rows removed per tick — keeps each tick's wall-clock and +// WAL pressure predictable even on a node with a multi-million-row +// backlog after enabling retention. At the default 1h interval, 10 +// batches × 10k = up to 100k rows/tick × 24 ticks/day = 2.4M rows/day, +// which exceeds the observed worst-case 1.1M ops/day write rate. +const maxBatchesPerTick = 10 + +// retentionTick runs a bounded retention sweep. It computes the cutoff +// once per tick, applies the cursor-floor invariant, then loops up to +// maxBatchesPerTick batches so a tick on a backlogged node makes +// measurable progress without monopolizing the DB. +func (c *Crudr) retentionTick(ctx context.Context, cfg RetentionConfig) error { + cutoff, reason, err := c.computeRetentionCutoff(ctx, cfg) + if err != nil { + return err + } + if cutoff == "" { + retention.RetentionSweepsSkipped.Add(1) + c.logger.Info("retention sweep skipped", zap.String("reason", reason)) + return nil + } + + batch := cfg.SweepBatchLimit + if batch <= 0 { + batch = 10000 + } + + var totalDeleted int64 + for i := 0; i < maxBatchesPerTick; i++ { + if err := ctx.Err(); err != nil { + return err + } + // Use a MATERIALIZED CTE so Postgres evaluates the SELECT once, + // honoring the LIMIT, and feeds the resulting ulid set to the + // DELETE. Without MATERIALIZED, Postgres 12+ may inline the CTE + // and the LIMIT semantics under a "DELETE ... IN (subquery)" + // shape become harder to reason about; we want the bounded-batch + // guarantee to hold regardless of planner version. + res := c.DB.WithContext(ctx).Exec(` + WITH victims AS MATERIALIZED ( + SELECT ulid + FROM ops + WHERE ulid < ? + ORDER BY ulid ASC + LIMIT ? + ) + DELETE FROM ops WHERE ulid IN (SELECT ulid FROM victims) + `, cutoff, batch) + if res.Error != nil { + return fmt.Errorf("retention delete: %w", res.Error) + } + totalDeleted += res.RowsAffected + if res.RowsAffected < int64(batch) { + break + } + } + + if totalDeleted > 0 { + retention.RetentionOpsDeleted.Add(uint64(totalDeleted)) + c.logger.Info("retention sweep deleted ops", + zap.Int64("rows_deleted", totalDeleted), + zap.String("cutoff_ulid", cutoff)) + } else { + retention.RetentionSweepsSkipped.Add(1) + c.logger.Debug("retention sweep: no eligible rows", + zap.String("cutoff_ulid", cutoff)) + } + return nil +} + +// computeRetentionCutoff returns the ULID below which ops are eligible for +// deletion under the given policy. Returns ("", reason, nil) when no rows +// are eligible: +// +// - any peer cursor is NULL or empty (a peer that has not advanced is +// treated as the most conservative possible cursor), +// - the smallest non-empty cursor is older than the age cutoff (a peer +// is more behind than the retention window allows; keep all ops). +// +// The cutoff itself is min(age_cutoff, cursor_floor_with_margin): +// whichever bound is tighter wins. The cursor floor is load-bearing +// (Topic 7 / cursor-floor invariant): no op younger than the slowest +// reachable peer's cursor may be deleted. +func (c *Crudr) computeRetentionCutoff(ctx context.Context, cfg RetentionConfig) (string, string, error) { + ageCutoff := time.Now().Add(-time.Duration(cfg.RetentionDays) * 24 * time.Hour) + ageCutoffULID, err := ulidAtTime(ageCutoff) + if err != nil { + return "", "compute age cutoff", err + } + + // Snapshot the active peer set under c.mu. The cursors table is + // shared with non-peer workers (qm_fix_truncated writes a CID under + // host='qm_fix_truncated'; decommissioned peers leave behind their + // rows). Without this filter, any such row would pin the retention + // floor indefinitely: ulid.Parse fails on a CID and the existing + // empty-cursor sentinel would still fire if we treated it as + // "malformed", or the floor would lex-sort to a nonsense value. + c.mu.Lock() + activePeers := make(map[string]bool, len(c.peerClients)) + for _, p := range c.peerClients { + activePeers[p.Host] = true + } + c.mu.Unlock() + + var cursors []Cursor + if err := c.DB.WithContext(ctx).Find(&cursors).Error; err != nil { + return "", "load cursors", fmt.Errorf("load cursors: %w", err) + } + + // Walk peer cursors only. Self-cursor is skipped. Empty / malformed / + // implausible-timestamp cursors on an active peer collapse into the + // empty-peer sentinel (most-conservative blocking). A missing cursor + // row for an active peer is treated the same. + floor := "" + emptyPeer := "" + seenPeers := map[string]bool{} + for _, cur := range cursors { + if cur.Host == c.host { + continue + } + if !activePeers[cur.Host] { + continue + } + seenPeers[cur.Host] = true + if cur.LastULID == "" { + if emptyPeer == "" || cur.Host < emptyPeer { + emptyPeer = cur.Host + } + continue + } + if _, perr := ulid.Parse(cur.LastULID); perr != nil || !isValidPeerSuppliedULID(cur.LastULID) { + c.logger.Warn("malformed or implausible peer cursor; treating as empty (most conservative)", + zap.String("peer", cur.Host), + zap.String("last_ulid", cur.LastULID)) + if emptyPeer == "" || cur.Host < emptyPeer { + emptyPeer = cur.Host + } + continue + } + if floor == "" || cur.LastULID < floor { + floor = cur.LastULID + } + } + for host := range activePeers { + if !seenPeers[host] { + if emptyPeer == "" || host < emptyPeer { + emptyPeer = host + } + } + } + if emptyPeer != "" { + return "", fmt.Sprintf("empty or missing cursor for active peer %s blocks deletion", emptyPeer), nil + } + + floorWithMargin := floor + if floor != "" && cfg.CursorSafetyMargin > 0 { + floorWithMargin, err = ulidShiftBack(floor, cfg.CursorSafetyMargin) + if err != nil { + return "", "apply cursor safety margin", err + } + } + + cutoff := ageCutoffULID + if floor != "" && floorWithMargin < cutoff { + cutoff = floorWithMargin + } + + // Fast-path: skip the delete if nothing is eligible. This is the + // common case once retention has caught up — most ticks are no-ops. + minULID, err := c.MinAvailableULID(ctx) + if err != nil { + return "", "load min ulid", err + } + if minULID == "" { + return "", "ops table empty", nil + } + if minULID >= cutoff { + return "", fmt.Sprintf("no rows older than cutoff (min=%s, cutoff=%s)", minULID, cutoff), nil + } + return cutoff, "", nil +} + +// ulidAtTime returns the smallest ULID with a timestamp >= t. We use all-zero +// entropy so the returned ULID is the lexicographic lower bound for the +// given millisecond — i.e. any op with timestamp < t has a ULID < the +// returned value. +func ulidAtTime(t time.Time) (string, error) { + id, err := ulid.New(ulid.Timestamp(t), bytes.NewReader(make([]byte, 16))) + if err != nil { + return "", err + } + return id.String(), nil +} + +// ulidShiftBack returns a ULID whose timestamp is d earlier than the +// timestamp encoded in srcULID. Entropy bytes are zeroed so the shifted +// ULID is the lower bound of its millisecond. +func ulidShiftBack(srcULID string, d time.Duration) (string, error) { + id, err := ulid.Parse(srcULID) + if err != nil { + return "", fmt.Errorf("parse ulid %q: %w", srcULID, err) + } + t := ulid.Time(id.Time()).Add(-d) + return ulidAtTime(t) +} + +// ServeCrudSweep returns the ops and the retention metadata that the sweep +// HTTP handler should advertise. When the caller's after parameter falls +// below this node's min(ulid), the second return is a non-empty string +// containing the smallest ULID currently available locally; callers should +// advertise this on the response (e.g. via the X-Retention-Gap-Min-Ulid +// header) so the requesting peer can advance its cursor explicitly +// rather than silently skipping the gap. +// +// This helper exists so the server-side handler and tests can share the +// gap-detection logic. The handler still applies its own gossip filter on +// the returned op slice. +func (c *Crudr) ServeCrudSweep(ctx context.Context, after string, limit int) (ops []*Op, gapMinULID string, err error) { + // MIN(ulid) and the bounded op slice MUST observe the same snapshot + // of `ops` so a retention DELETE that commits between the two reads + // cannot leave us serving a row the gap header doesn't cover. + // sql.TxOptions.Isolation emits the level at BEGIN so we don't + // depend on a separate SET TRANSACTION statement ordering. + err = c.DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var minOut sql.NullString + if err := tx.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&minOut).Error; err != nil { + return fmt.Errorf("load min ulid: %w", err) + } + minULID := "" + if minOut.Valid { + minULID = minOut.String + } + if after != "" && minULID != "" && after < minULID { + gapMinULID = minULID + } + return tx.Where("ulid > ?", after).Limit(limit).Order("ulid asc").Find(&ops).Error + }, &sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}) + if err != nil { + return nil, "", err + } + return ops, gapMinULID, nil +} + +// HTTP headers used by the sweep handler to advertise the retention-gap +// signal and the lowest locally-available ULID. Clients that don't +// understand these headers ignore them and proceed as before; clients +// that do understand them advance their cursor to gapMinULID instead of +// silently skipping the missing range. +const ( + HeaderRetentionGap = "X-Mediorum-Retention-Gap" + HeaderAvailableMin = "X-Mediorum-Available-Min-Ulid" +) + +// MarkSweepGapAdvance increments the gap-advance counter. Exposed for the +// sweep client; tests can read retention.SweepGapAdvances directly. +func MarkSweepGapAdvance() { retention.SweepGapAdvances.Add(1) } + +// gapULIDClockSkewWindow is the maximum forward time skew we'll accept on +// a peer-supplied ulid. A peer whose ulid decodes to a time more than +// this far in the future is treated as misconfigured or hostile. +// 30 minutes is wide enough to cover legitimate cross-host clock drift +// while still preventing a forged far-future ulid from permanently +// silencing a sweep stream or pinning our cursor at an attacker-chosen +// position. +const gapULIDClockSkewWindow = 30 * time.Minute + +// gapULIDEarliestPlausibleTime is the lower bound for any peer-supplied +// ulid. ULIDs were introduced in 2016; an epoch-zero or pre-2016 ulid +// from a peer is corruption or hostile crafting. Without this bound, +// ulidShiftBack on a near-epoch floor could underflow and permanently +// stall retention. +var gapULIDEarliestPlausibleTime = time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC) + +// isValidPeerSuppliedULID returns true when the candidate ulid can be +// safely accepted from a peer. Applied at: +// - ApplyOp (rejects implausible op.ULID before persisting) +// - the gap-header receive path +// - the cursor LOAD path in doSweep (a persisted implausible cursor +// would otherwise echo back into the next sweep request) +// - the cursor walk in computeRetentionCutoff (alongside ulid.Parse; +// implausible-but-parseable rows are treated as missing so a hostile +// or pre-PR bad row cannot poison the floor) +// +// Without this guard, a hostile peer could either jump our cursor to a +// far-future ulid (silencing the sweep stream) or pin our retention +// floor at an epoch ulid (underflowing ulidShiftBack). +func isValidPeerSuppliedULID(candidate string) bool { + id, err := ulid.Parse(candidate) + if err != nil { + return false + } + t := ulid.Time(id.Time()) + if t.Before(gapULIDEarliestPlausibleTime) { + return false + } + return t.Before(time.Now().Add(gapULIDClockSkewWindow)) +} diff --git a/pkg/mediorum/crudr/retention_test.go b/pkg/mediorum/crudr/retention_test.go new file mode 100644 index 00000000..5cfec365 --- /dev/null +++ b/pkg/mediorum/crudr/retention_test.go @@ -0,0 +1,1224 @@ +package crudr + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/OpenAudio/go-openaudio/pkg/lifecycle" + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// retentionTestModel is a minimal registered CRUD model used by the +// retention tests. We give it a primary key so OnConflict-style apply +// paths work without panicking and so it mirrors the real registered +// models (Upload, QmAudioAnalysis, etc). +type retentionTestModel struct { + Key string `gorm:"primaryKey"` +} + +// otherRetentionTestModel mirrors the dormant-table scenario: another +// table the crud layer knows about, ops to which we will or won't +// emit depending on the test. +type otherRetentionTestModel struct { + Key string `gorm:"primaryKey"` +} + +// newRetentionCrudr returns a fresh crudr wired to the shared test DB, +// with the retention test models registered and the ops/cursors tables +// truncated. Tests that mutate env vars must restore them via t.Cleanup. +// The underlying *sql.DB is closed via t.Cleanup so a `-count=N` run does +// not exhaust Postgres connections. +func newRetentionCrudr(t *testing.T, host string) *Crudr { + t.Helper() + db := SetupTestDB() + t.Cleanup(func() { + if sqlDB, err := db.DB(); err == nil { + sqlDB.Close() + } + }) + + // migrate the retention test models so ApplyOp works + require.NoError(t, db.AutoMigrate(&retentionTestModel{}, &otherRetentionTestModel{})) + + // fully reset state between tests in the same package run + require.NoError(t, db.Exec(`TRUNCATE ops`).Error) + require.NoError(t, db.Exec(`TRUNCATE cursors`).Error) + require.NoError(t, db.Exec(`TRUNCATE retention_test_models`).Error) + require.NoError(t, db.Exec(`TRUNCATE other_retention_test_models`).Error) + + z := zap.NewNop() + c := New(host, nil, nil, db, lifecycle.NewLifecycle(context.Background(), "retention test", z), z, nil) + c.RegisterModels(&retentionTestModel{}, &otherRetentionTestModel{}) + return c +} + +// registerTestPeer attaches a PeerClient for the given host to the +// crudr's active-peer set, so the retention floor logic recognizes +// cursors with that Host as belonging to an active peer. The client +// is not Start()'d so no actual sweep traffic is generated. +func registerTestPeer(c *Crudr, host string) { + c.mu.Lock() + defer c.mu.Unlock() + c.peerClients = append(c.peerClients, NewPeerClient(host, c, c.host)) +} + +// resetRetentionStats clears the package counters so tests don't +// pollute each other. +func resetRetentionStats() { + retention.DormantTablesCleaned.Store(0) + retention.DormantOpsDeleted.Store(0) + retention.RetentionOpsDeleted.Store(0) + retention.RetentionSweepsSkipped.Store(0) + retention.SweepGapAdvances.Store(0) +} + +// setupRetentionTestDB opens a freshly-truncated test DB and registers a +// t.Cleanup to close it, so a long `-count=N` run never exhausts +// Postgres connections. +func setupRetentionTestDB(t *testing.T) *gorm.DB { + t.Helper() + db := SetupTestDB() + t.Cleanup(func() { + if sqlDB, err := db.DB(); err == nil { + sqlDB.Close() + } + }) + require.NoError(t, db.AutoMigrate(&retentionTestModel{})) + require.NoError(t, db.Exec(`TRUNCATE ops`).Error) + require.NoError(t, db.Exec(`TRUNCATE cursors`).Error) + return db +} + +// insertOpAt inserts an op with a ULID derived from the given wall time +// and a deterministic per-call entropy stream so callers with overlapping +// timestamps still get distinct ULIDs. Returns the inserted ULID string. +func insertOpAt(t *testing.T, c *Crudr, table string, at time.Time) string { + t.Helper() + return insertOpAtWithEntropy(t, c, table, at, ulid.Make().Entropy()) +} + +func insertOpAtWithEntropy(t *testing.T, c *Crudr, table string, at time.Time, entropy []byte) string { + t.Helper() + id, err := ulid.New(ulid.Timestamp(at), bytes.NewReader(entropy)) + require.NoError(t, err) + op := &Op{ + ULID: id.String(), + Host: c.host, + Action: ActionCreate, + Table: table, + Data: json.RawMessage(`[]`), + } + require.NoError(t, c.DB.Create(op).Error) + return id.String() +} + +func countOpsForTable(t *testing.T, c *Crudr, table string) int64 { + t.Helper() + var n int64 + require.NoError(t, c.DB.Raw(`SELECT COUNT(*) FROM ops WHERE "table" = ?`, table).Scan(&n).Error) + return n +} + +func TestLoadRetentionConfig_Defaults(t *testing.T) { + // Snapshot and restore env so we don't leak into sibling tests. + for _, k := range []string{ + "OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", + "OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", + "OPENAUDIO_MEDIORUM_DORMANT_OPS_THRESHOLD", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_SWEEP_INTERVAL", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_BATCH_LIMIT", + "OPENAUDIO_MEDIORUM_OPS_RETENTION_CURSOR_MARGIN", + "OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX", + } { + prev, had := os.LookupEnv(k) + os.Unsetenv(k) + t.Cleanup(func() { + if had { + os.Setenv(k, prev) + } + }) + } + cfg := LoadRetentionConfig() + assert.False(t, cfg.DormantCleanupEnabled, "production-scale dormant cleanup must be opt-in") + assert.Equal(t, 90*24*time.Hour, cfg.DormantThreshold) + assert.Equal(t, 0, cfg.RetentionDays, "ongoing retention must default OFF") + assert.Equal(t, 1*time.Hour, cfg.SweepInterval) + assert.Equal(t, 10000, cfg.SweepBatchLimit) + assert.Equal(t, 1*time.Hour, cfg.CursorSafetyMargin) + assert.False(t, cfg.EnsureOpsTableIndex, "production-scale index build must be opt-in") +} + +func TestLoadRetentionConfig_OptInDormant(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", "true") + cfg := LoadRetentionConfig() + assert.True(t, cfg.DormantCleanupEnabled) +} + +func TestLoadRetentionConfig_DormantKillSwitchWins(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENABLE_DORMANT_OPS_CLEANUP", "true") + t.Setenv("OPENAUDIO_MEDIORUM_KEEP_DORMANT_OPS", "true") + cfg := LoadRetentionConfig() + assert.False(t, cfg.DormantCleanupEnabled) +} + +func TestLoadRetentionConfig_OptInOpsTableIndex(t *testing.T) { + t.Setenv("OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX", "true") + cfg := LoadRetentionConfig() + assert.True(t, cfg.EnsureOpsTableIndex) +} + +// Component 1 — dormant-table cleanup tests. + +func TestCleanupDormantOps_DropsDormantTable(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // 100 ops on the dormant table, all 200 days old. + for i := 0; i < 100; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // 5 ops on a still-active table, all within the last hour. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "other_retention_test_models", now.Add(-time.Duration(i)*time.Minute)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + assert.Equal(t, int64(100), deleted["retention_test_models"]) + assert.NotContains(t, deleted, "other_retention_test_models", "active table must not be cleaned") + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(5), countOpsForTable(t, c, "other_retention_test_models")) + assert.Equal(t, uint64(1), retention.DormantTablesCleaned.Load()) + assert.Equal(t, uint64(100), retention.DormantOpsDeleted.Load()) +} + +func TestCleanupDormantOps_OptOutPreservesAll(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: false, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted) + assert.Equal(t, int64(25), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(0), retention.DormantTablesCleaned.Load()) +} + +func TestCleanupDormantOps_Idempotent(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 7; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-180*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted1, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(7), deleted1["retention_test_models"]) + + deleted2, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted2, "second run on an already-cleaned table must be a no-op") +} + +func TestCleanupDormantOps_RecentWriteOnDormantTableBlocksDelete(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // 50 old ops... + for i := 0; i < 50; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // ...and one recent op on the same table. + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute)) + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "a single recent op must protect all ops for the table") + assert.Equal(t, int64(51), countOpsForTable(t, c, "retention_test_models")) +} + +func TestCleanupDormantOps_AfterCleanupMinUlidReflectsRemaining(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour)) + keepUlid := insertOpAt(t, c, "other_retention_test_models", now.Add(-1*time.Hour)) + + _, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + minULID, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.Equal(t, keepUlid, minULID, "min ulid should be the surviving op") +} + +func TestCleanupDormantOps_UnregisteredTableUntouched(t *testing.T) { + // A table whose model the crud layer does NOT know about must not + // be classified as dormant just because it has no recent ops; it + // must not appear in c.typeMap at all. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // Pretend the ops table has rows for a long-removed table. + for i := 0; i < 10; i++ { + insertOpAt(t, c, "ghost_table", now.Add(-365*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "unregistered tables must not be touched by the dormant cleanup") + assert.Equal(t, int64(10), countOpsForTable(t, c, "ghost_table")) +} + +func TestCleanupDormantOps_RespectsContextCancel(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := c.CleanupDormantOps(ctx, cfg) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) +} + +// Component 2 — gap-signal tests. + +func TestServeCrudSweep_BelowMinReturnsGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 3; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(time.Duration(i)*time.Millisecond)) + } + + veryOld, err := ulid.New(ulid.Timestamp(now.Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + + ops, gap, err := c.ServeCrudSweep(context.Background(), veryOld.String(), 100) + require.NoError(t, err) + assert.NotEmpty(t, gap, "after below min must signal gap") + assert.Len(t, ops, 3, "returned ops are the full set above the caller's cursor") +} + +func TestServeCrudSweep_AtOrAboveMinNoGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + first := insertOpAt(t, c, "retention_test_models", now) + insertOpAt(t, c, "retention_test_models", now.Add(1*time.Millisecond)) + + // passing exactly the lowest ULID means "give me ops > min": no gap. + ops, gap, err := c.ServeCrudSweep(context.Background(), first, 100) + require.NoError(t, err) + assert.Empty(t, gap) + assert.Len(t, ops, 1, "expect the single op strictly greater than the cursor") +} + +func TestServeCrudSweep_EmptyAfterIsNotAGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + insertOpAt(t, c, "retention_test_models", time.Now()) + + ops, gap, err := c.ServeCrudSweep(context.Background(), "", 100) + require.NoError(t, err) + assert.Empty(t, gap, "first-time sweep (empty cursor) is not a retention gap") + assert.Len(t, ops, 1) +} + +func TestServeCrudSweep_EmptyOpsTableNoGap(t *testing.T) { + c := newRetentionCrudr(t, "host1") + // caller has a cursor but the local ops table is empty (e.g. fresh + // node). No gap, no ops. + ops, gap, err := c.ServeCrudSweep(context.Background(), "01HABCD000000000000000000A", 100) + require.NoError(t, err) + assert.Empty(t, gap) + assert.Empty(t, ops) +} + +// Dry-run preview tests. + +func TestDryRunRetention_DormantOnly(t *testing.T) { + // Default config: dormant cleanup ON, retention sweep OFF. Dry run + // reports the dormant table count and skips the retention preview. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 50; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // Active table — must not appear in the plan. + insertOpAt(t, c, "other_retention_test_models", now.Add(-1*time.Minute)) + + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(50), plan.DormantTables["retention_test_models"]) + assert.NotContains(t, plan.DormantTables, "other_retention_test_models") + assert.Greater(t, plan.DormantBytes, int64(0), "dormant bytes must be reported") + assert.Equal(t, int64(0), plan.RetentionRows, "retention preview skipped when RetentionDays=0") + assert.NotEmpty(t, plan.DormantCutoffULID) + // Dry run must not delete anything. + assert.Equal(t, int64(50), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(1), countOpsForTable(t, c, "other_retention_test_models")) +} + +func TestDryRunRetention_RetentionPreviewSkipsOnEmptyCursor(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "empty-peer", LastULID: ""}).Error) + registerTestPeer(c, "empty-peer") + + cfg := RetentionConfig{DormantCleanupEnabled: false, RetentionDays: 30, SweepBatchLimit: 100, CursorSafetyMargin: 1 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(0), plan.RetentionRows) + assert.NotEmpty(t, plan.RetentionSkipReason, "empty cursor must produce a skip reason") + assert.Equal(t, int64(10), countOpsForTable(t, c, "retention_test_models")) +} + +func TestDryRunRetention_RetentionPreviewWithEligibleRows(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + // 30 old ops + 5 recent ops. + for i := 0; i < 30; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{DormantCleanupEnabled: false, RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(30), plan.RetentionRows, "old ops counted, recent ops preserved") + assert.Greater(t, plan.RetentionBytes, int64(0)) + assert.Empty(t, plan.RetentionSkipReason) + assert.NotEmpty(t, plan.RetentionCutoffULID) + // No DELETE executed. + assert.Equal(t, int64(35), countOpsForTable(t, c, "retention_test_models")) +} + +func TestDryRunRetention_DormantCleanupDisabledNoPreview(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{DormantCleanupEnabled: false, DormantThreshold: 30 * 24 * time.Hour} + plan, err := c.DryRunRetention(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, plan.DormantTables) + assert.Equal(t, int64(0), plan.DormantBytes) +} + +// Component 3 — retention sweep tests. + +func TestRunRetention_DisabledIsNoOp(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-365*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{RetentionDays: 0} + + // Disabled retention should block on ctx; we cancel quickly to exit + // the loop without deleting anything. + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + err := c.RunRetention(ctx, cfg) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Equal(t, int64(20), countOpsForTable(t, c, "retention_test_models")) +} + +func TestRetentionTick_AllCursorsAheadDeletesOldOps(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Old ops (> 30d) and recent ops (< 1h). + for i := 0; i < 25; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + // Every peer cursor is at "now" — no peer is behind. + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-a", LastULID: recentULID.String()}).Error) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-b", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Old ops gone, recent ops kept. + assert.Equal(t, int64(5), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(25), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_CursorBelowCutoffPreventsDelete(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Three age cohorts of ops to verify cursor pinning: + // - 60d old: older than the slow cursor (45d) => deletable. + // - 40d old: newer than the slow cursor => MUST be kept + // (this is the load-bearing case). + // - 1m old: well within the cursor => kept. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-40*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + stuckCursor, err := ulid.New(ulid.Timestamp(now.Add(-45*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "slow-peer", LastULID: stuckCursor.String()}).Error) + registerTestPeer(c, "slow-peer") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Only the 60d cohort should be deleted; the 40d cohort sits above + // the slow cursor and MUST be preserved, otherwise that peer's next + // sweep would silently skip a deleted range. + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(10), remaining, + "slow peer must block deletion of ops newer than its cursor") + assert.Equal(t, uint64(5), retention.RetentionOpsDeleted.Load()) + + // No remaining op may be older than (cursor - margin). + floorWithMargin, err := ulidShiftBack(stuckCursor.String(), 1*time.Hour) + require.NoError(t, err) + var oldestRemaining string + require.NoError(t, c.DB.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&oldestRemaining).Error) + assert.GreaterOrEqual(t, oldestRemaining, floorWithMargin, + "no remaining op may be older than the cursor-safety floor") +} + +// TestRetentionTick_NonPeerCursorRowsIgnored proves that a cursor row +// written by a non-peer worker (e.g., qm_fix_truncated stores a CID +// under the same `cursors` table) does NOT pin retention. Without the +// activePeers filter in computeRetentionCutoff, that CID would parse +// as a malformed/zero ULID and pin the floor at the epoch, silently +// suppressing all deletions across the fleet. +func TestRetentionTick_NonPeerCursorRowsIgnored(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + + // qm_fix_truncated writes a CID into LastULID under its own host name. + // CIDs are not ULIDs and the host is not a registered peer. Both the + // host filter and the ulid plausibility check must reject this row. + require.NoError(t, c.DB.Create(&Cursor{ + Host: "qm_fix_truncated", + LastULID: "QmTZxYzAbCd1234567890abcdef1234567890abcdef12", + }).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(5), remaining, + "non-peer cursor row must not pin retention; the 60d cohort should delete") + assert.Equal(t, uint64(5), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_EmptyCursorBlocksDeletion(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 12; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "peer-empty", LastULID: ""}).Error) + registerTestPeer(c, "peer-empty") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(12), countOpsForTable(t, c, "retention_test_models"), + "an empty cursor row must be treated as the most conservative possible cursor") + assert.Equal(t, uint64(0), retention.RetentionOpsDeleted.Load()) + assert.GreaterOrEqual(t, retention.RetentionSweepsSkipped.Load(), uint64(1)) +} + +func TestRetentionTick_SafetyMarginHonored(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // Three op ages spread around a slow cursor at -30d: + // older than (cursor - 2h), within (cursor - 2h, cursor), and newer than cursor. + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-50*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-30*24*time.Hour-30*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-29*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + stuckCursor, err := ulid.New(ulid.Timestamp(now.Add(-30*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "slow", LastULID: stuckCursor.String()}).Error) + registerTestPeer(c, "slow") + + // 1h margin pushes cutoff to (cursor - 1h). Anything between cursor-1h + // and cursor must be preserved. + cfg := RetentionConfig{RetentionDays: 1, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // Only the very-old (-50d) bucket can be deleted; the 30m-before-cursor + // bucket is inside the safety margin and the 29d bucket sits above the + // cursor itself. + remaining := countOpsForTable(t, c, "retention_test_models") + assert.Equal(t, int64(10), remaining, + "safety margin must keep the 30-min-before-cursor bucket alive") +} + +func TestRetentionTick_BatchLimitHonored(t *testing.T) { + // Drains the eligible set with batch=100, looping internally up to + // maxBatchesPerTick. 250 eligible rows complete in 3 batches and + // the loop exits on the short final batch. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 250; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 100, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, uint64(250), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_DrainsUpToMaxBatchesPerTick(t *testing.T) { + // A backlogged tick must take more than one batch in a single call, + // up to maxBatchesPerTick. With batch=10 and maxBatches=10, a tick + // against a 200-row eligible set should delete 100 rows (10 batches + // × 10 rows), leaving 100 untouched until the next tick. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 200; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 10, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + remaining := countOpsForTable(t, c, "retention_test_models") + // We deleted at most maxBatchesPerTick * batch = 100 rows. + expectedRemaining := int64(200) - int64(maxBatchesPerTick*10) + assert.Equal(t, expectedRemaining, remaining, + "tick must drain up to maxBatchesPerTick batches; backlog finishes on subsequent ticks") + assert.Equal(t, uint64(maxBatchesPerTick*10), retention.RetentionOpsDeleted.Load()) +} + +func TestRetentionTick_IteratesAllRegisteredTables(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 5; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + insertOpAt(t, c, "other_retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // retention deletes by ulid alone (not per-table), so all 10 ops are + // dropped in this single tick — this is the documented behavior of + // the first PR: uniform cutoff across all CRUD tables. + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.Equal(t, int64(0), countOpsForTable(t, c, "other_retention_test_models")) +} + +func TestRetentionTick_SelfCursorIgnored(t *testing.T) { + // A cursor row whose host equals the crudr's selfHost must not + // block deletion: it's not a remote peer, it's the node's own + // (rare/test-only) self-pointer. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + require.NoError(t, c.DB.Create(&Cursor{Host: "host1", LastULID: ""}).Error) + + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models"), + "self-cursor must not block deletion") +} + +func TestRetentionTick_NoPeersUsesAgeCutoffOnly(t *testing.T) { + // Devnet-style single-node: no peer cursors. The age cutoff is the + // only gate. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 8; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) +} + +// Client-side gap-signal tests. + +// roundTripFunc lets a test stand in for the peer HTTP server. +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +func TestPeerClient_DetectsGapAndAdvancesCursor(t *testing.T) { + resetRetentionStats() + db := setupRetentionTestDB(t) + + // peer advertises a gap with min ulid in the recent past + peerMinULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-7*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + + // our cursor is far below the peer's min + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-x", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + assert.Equal(t, oldCursorULID.String(), r.URL.Query().Get("after")) + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, peerMinULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer-x", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + // Cursor must be advanced to the peer's advertised floor. + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://peer-x").First(&cur).Error) + assert.Equal(t, peerMinULID.String(), cur.LastULID, + "cursor must explicitly advance to peer's advertised retention floor") + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load(), + "sweep client must increment the gap counter for operator visibility") +} + +func TestPeerClient_GapHeaderRespectedAcrossTicks(t *testing.T) { + // On a steady-state retention scenario, the same cursor should not + // re-advance over the same gap on the next tick — the persisted + // cursor now equals or exceeds gapMinULID, so the + // `gapMinULID > lastUlid` guard suppresses the second increment. + resetRetentionStats() + db := setupRetentionTestDB(t) + + peerMinULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-7*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-z", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, peerMinULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + peer := NewPeerClient("http://peer-z", c, "http://self") + + // First tick: counter increments once. + require.NoError(t, peer.doSweep(context.Background())) + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load()) + + // Second tick: cursor is now == peerMinULID, gap signal still present + // but suppressed by the strict-greater-than guard. + require.NoError(t, peer.doSweep(context.Background())) + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load(), + "counter must not double-count when cursor already at floor") +} + +func TestPeerClient_HostileFarFutureGapULIDRejected(t *testing.T) { + // A hostile or misconfigured peer that advertises a gap ulid far in + // the future must NOT silence our sweep stream. The client must + // reject the gap header, keep its cursor where it was, and continue + // applying any ops in the response body normally. + resetRetentionStats() + db := setupRetentionTestDB(t) + + // A ulid 5 years in the future — well beyond any legitimate clock + // skew window. + futureULID, err := ulid.New(ulid.Timestamp(time.Now().Add(5*365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://hostile-peer", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, futureULID.String()) + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://hostile-peer", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + // Cursor must NOT have advanced — the hostile gap was rejected. + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://hostile-peer").First(&cur).Error) + assert.Equal(t, oldCursorULID.String(), cur.LastULID, + "cursor must not advance across a far-future advertised gap ulid") + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load(), + "hostile gap must not increment the operator metric") +} + +func TestPeerClient_MalformedGapULIDRejected(t *testing.T) { + // A non-ULID string in the gap header is also a rejection case. + resetRetentionStats() + db := setupRetentionTestDB(t) + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set(HeaderRetentionGap, "true") + h.Set(HeaderAvailableMin, "not-a-ulid-this-is-garbage") + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", "http://peer").First(&cur).Error) + assert.Equal(t, oldCursorULID.String(), cur.LastULID) + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load()) +} + +func TestIsValidGapULID(t *testing.T) { + now := time.Now() + + mk := func(at time.Time) string { + id, err := ulid.New(ulid.Timestamp(at), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + return id.String() + } + + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(-30*24*time.Hour))), "past ulid valid") + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(-1*time.Second))), "recent past ulid valid") + assert.True(t, isValidPeerSuppliedULID(mk(now.Add(15*time.Minute))), "small forward skew valid") + assert.False(t, isValidPeerSuppliedULID(mk(now.Add(2*time.Hour))), "2h forward beyond skew window rejected") + assert.False(t, isValidPeerSuppliedULID(mk(now.Add(365*24*time.Hour))), "1y forward rejected") + assert.False(t, isValidPeerSuppliedULID(""), "empty string rejected") + assert.False(t, isValidPeerSuppliedULID("not a ulid"), "garbage rejected") +} + +func TestPeerClient_NoGapHeaderNoAdvance(t *testing.T) { + resetRetentionStats() + db := setupRetentionTestDB(t) + + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, db.Create(&Cursor{Host: "http://peer-y", LastULID: oldCursorULID.String()}).Error) + + rt := roundTripFunc(func(r *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("Content-Type", "application/json") + return &http.Response{ + StatusCode: 200, + Header: h, + Body: io.NopCloser(bytes.NewBufferString("[]")), + Request: r, + }, nil + }) + + z := zap.NewNop() + c := New("http://self", nil, nil, db, lifecycle.NewLifecycle(context.Background(), "test", z), z, &http.Client{Transport: rt}) + c.RegisterModels(&retentionTestModel{}) + + peer := NewPeerClient("http://peer-y", c, "http://self") + require.NoError(t, peer.doSweep(context.Background())) + + assert.Equal(t, uint64(0), retention.SweepGapAdvances.Load(), + "no gap header => no counter increment") +} + +// MinAvailableULID surface test. + +func TestMinAvailableULID_EmptyTable(t *testing.T) { + c := newRetentionCrudr(t, "host1") + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.Empty(t, min) +} + +func TestMinAvailableULID_ReturnsSmallest(t *testing.T) { + c := newRetentionCrudr(t, "host1") + now := time.Now() + insertOpAt(t, c, "retention_test_models", now.Add(-2*time.Hour)) + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Hour)) + insertOpAt(t, c, "retention_test_models", now) + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.NotEmpty(t, min) + + // The smallest ulid is the one we inserted at -2h. + var explicitMin string + require.NoError(t, c.DB.Raw(`SELECT MIN(ulid) FROM ops`).Scan(&explicitMin).Error) + assert.Equal(t, explicitMin, min) +} + +// Operator-config safety tests. + +func TestCleanupDormantOps_BelowMinThresholdClamped(t *testing.T) { + // An operator who sets the threshold below the safety floor (24h) + // must not be able to delete brand-new tables: the clamp keeps a + // brief lull from being classified as dormant. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 1 * time.Minute} + + now := time.Now() + for i := 0; i < 10; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-2*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Empty(t, deleted, "threshold < 24h must clamp up; -2h ops must not be classified dormant") + assert.Equal(t, int64(10), countOpsForTable(t, c, "retention_test_models")) +} + +func TestCleanupDormantOps_PreservesNewOpsAboveCutoff(t *testing.T) { + // Race-window guard: a producer writes a new op for a table the + // cleanup has just classified as dormant. The new op's ULID is above + // the cutoff, so the bounded WHERE clause must spare it even though + // the table appears dormant from the MAX(ulid) check. + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now() + // Old ops, far below the cutoff. + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + // Inject a NEW op *above* the cutoff after the dormancy classification + // but before cleanup gets to its DELETE. We simulate the race by + // inserting before the call: same effect because the dormancy check + // uses MAX(ulid) which still returns one of the old ulids if we put + // the recent one in *after* picking maxULID. But the cleanup's + // `WHERE ulid < cutoff` guard means recent ops are never touched + // regardless of MAX ordering — the property we're verifying here. + freshULID := insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute)) + + // The dormancy check will see freshULID as the max and classify the + // table as still-active. To exercise the race-guard explicitly, + // re-run with a threshold tight enough that the table is dormant + // EXCEPT for the fresh op (12h between fresh op and cutoff < 24h + // floor, so we use 23h base then test the WHERE clause directly). + // + // Simpler probe: call CleanupDormantOps and assert freshULID survives + // even if the table is "borderline dormant". Both paths converge on + // the same guarantee: ulids above cutoff never get deleted. + _, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + + // freshULID must still be present. + var found int64 + require.NoError(t, c.DB.Raw(`SELECT COUNT(*) FROM ops WHERE ulid = ?`, freshULID).Scan(&found).Error) + assert.Equal(t, int64(1), found, "ops above the cutoff must never be deleted by dormant cleanup") +} + +func TestCleanupDormantOps_BatchedDeletionLargeTable(t *testing.T) { + // Exercise the batched path: insert more rows than dormantBatchSize + // (10k) to confirm multiple iterations complete and totalForTable + // is correct. + if testing.Short() { + t.Skip("large insert test: skipped under -short") + } + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + cfg := RetentionConfig{DormantCleanupEnabled: true, DormantThreshold: 30 * 24 * time.Hour} + + now := time.Now().Add(-200 * 24 * time.Hour) + // 15001 rows -> 2 full batches + 1 short batch + rows := 15001 + bulk := make([]Op, 0, rows) + for i := 0; i < rows; i++ { + entropy := make([]byte, 10) + entropy[0] = byte(i & 0xff) + entropy[1] = byte((i >> 8) & 0xff) + id, err := ulid.New(ulid.Timestamp(now.Add(time.Duration(i)*time.Millisecond)), bytes.NewReader(entropy)) + require.NoError(t, err) + bulk = append(bulk, Op{ + ULID: id.String(), + Host: "host1", + Action: ActionCreate, + Table: "retention_test_models", + Data: json.RawMessage(`[]`), + }) + } + // chunk inserts so the test stays reasonable + for start := 0; start < len(bulk); start += 5000 { + end := start + 5000 + if end > len(bulk) { + end = len(bulk) + } + require.NoError(t, c.DB.Create(bulk[start:end]).Error) + } + require.Equal(t, int64(rows), countOpsForTable(t, c, "retention_test_models")) + + deleted, err := c.CleanupDormantOps(context.Background(), cfg) + require.NoError(t, err) + assert.Equal(t, int64(rows), deleted["retention_test_models"]) + assert.Equal(t, int64(0), countOpsForTable(t, c, "retention_test_models")) + assert.GreaterOrEqual(t, retention.DormantOpsDeleted.Load(), uint64(rows)) +} + +// End-to-end gap signal: real HTTP server with the actual handler, real +// client. Catches wiring bugs between serve_crud.go and client.go. + +func TestSweep_EndToEnd_GapHeaderPropagatesAndClientAdvances(t *testing.T) { + resetRetentionStats() + // Single shared DB (test harness uses one), so the server-side Crudr + // and the client-side PeerClient both operate against the same ops + // table. The handler reads from c.ServeCrudSweep; the client reads + // the response. Truncate once, then never wipe again — both sides + // share state. + c := newRetentionCrudr(t, "self-host") + + // Seed three recent ops as the "peer" side. + now := time.Now() + for i := 0; i < 3; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(time.Duration(i)*time.Millisecond)) + } + + // Build a minimal HTTP server that mirrors what serve_crud.go does: + // call ServeCrudSweep and forward the gap headers verbatim. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + after := r.URL.Query().Get("after") + ops, gap, err := c.ServeCrudSweep(r.Context(), after, 100) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + if gap != "" { + w.Header().Set(HeaderRetentionGap, "true") + w.Header().Set(HeaderAvailableMin, gap) + } + w.Header().Set("Content-Type", "application/json") + if ops == nil { + ops = []*Op{} + } + buf, _ := json.Marshal(ops) + w.Write(buf) + })) + defer server.Close() + + // Plant a cursor that's a year behind the server's ops. This is the + // pre-gap state of a peer that was offline through the retention + // window. + oldCursorULID, err := ulid.New(ulid.Timestamp(time.Now().Add(-365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: server.URL, LastULID: oldCursorULID.String()}).Error) + + // The PeerClient runs the same production code path; we just inject + // the httptest client transport so the gap headers travel over the + // wire path the real sweep uses. + c.httpClient = server.Client() + peer := NewPeerClient(server.URL, c, "self-host") + require.NoError(t, peer.doSweep(context.Background())) + + var cur Cursor + require.NoError(t, c.DB.Where("host = ?", server.URL).First(&cur).Error) + min, err := c.MinAvailableULID(context.Background()) + require.NoError(t, err) + assert.GreaterOrEqual(t, cur.LastULID, min, "client cursor must be at or above peer's min") + assert.Equal(t, uint64(1), retention.SweepGapAdvances.Load()) +} + +// Stale-cursor pinning behavior: an ancient peer cursor must pin the +// retention floor even when the age cutoff would otherwise delete those ops. +// Demonstrates the documented limitation that stale cursors block retention. +func TestRetentionTick_AncientCursorPinsAllRetention(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + + // All ops > retention window (200d). + for i := 0; i < 12; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-200*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + + // A long-decommissioned peer with a cursor from 3 years ago. + ancientULID, err := ulid.New(ulid.Timestamp(now.Add(-3*365*24*time.Hour)), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "decommissioned-peer", LastULID: ancientULID.String()}).Error) + registerTestPeer(c, "decommissioned-peer") + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 1000, CursorSafetyMargin: 1 * time.Hour} + require.NoError(t, c.retentionTick(context.Background(), cfg)) + + // All ops survive: the ancient cursor pins the floor at -3y, far + // below the oldest op we have, so the cutoff is below our min(ulid) + // and nothing is eligible. + assert.Equal(t, int64(12), countOpsForTable(t, c, "retention_test_models"), + "ancient cursor must pin the retention floor") + assert.Equal(t, uint64(0), retention.RetentionOpsDeleted.Load()) +} + +// Concurrent sweep + retention: prove Postgres MVCC keeps both safe. +func TestRetention_ConcurrentSweepAndDeleteSafety(t *testing.T) { + resetRetentionStats() + c := newRetentionCrudr(t, "host1") + now := time.Now() + for i := 0; i < 200; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-60*24*time.Hour).Add(time.Duration(i)*time.Millisecond)) + } + for i := 0; i < 20; i++ { + insertOpAt(t, c, "retention_test_models", now.Add(-1*time.Minute).Add(time.Duration(i)*time.Millisecond)) + } + recentULID, err := ulid.New(ulid.Timestamp(now), bytes.NewReader(make([]byte, 16))) + require.NoError(t, err) + require.NoError(t, c.DB.Create(&Cursor{Host: "peer", LastULID: recentULID.String()}).Error) + + cfg := RetentionConfig{RetentionDays: 30, SweepBatchLimit: 50, CursorSafetyMargin: 1 * time.Hour} + + // Fire a retention tick and a sweep query concurrently; assert both + // complete without error and that the sweep returns a coherent + // snapshot. + doneSweep := make(chan int, 1) + doneRetention := make(chan error, 1) + go func() { + ops, _, err := c.ServeCrudSweep(context.Background(), "", 1000) + if err != nil { + doneSweep <- -1 + return + } + doneSweep <- len(ops) + }() + go func() { + doneRetention <- c.retentionTick(context.Background(), cfg) + }() + + require.NoError(t, <-doneRetention) + got := <-doneSweep + assert.GreaterOrEqual(t, got, 0, "concurrent sweep must complete without error") +} diff --git a/pkg/mediorum/server/serve_crud.go b/pkg/mediorum/server/serve_crud.go index f09a1b98..4d6a8fe7 100644 --- a/pkg/mediorum/server/serve_crud.go +++ b/pkg/mediorum/server/serve_crud.go @@ -25,14 +25,7 @@ func (ss *MediorumServer) serveCrudSweep(c echo.Context) error { defer cancel() after := c.QueryParam("after") - var ops []*crudr.Op - err := ss.crud.DB. - WithContext(ctx). - Where("ulid > ?", after). - Limit(PullLimit). - Order("ulid asc"). - Find(&ops). - Error + ops, gapMinULID, err := ss.crud.ServeCrudSweep(ctx, after, PullLimit) if err != nil { return c.String(500, fmt.Sprintf("Failed to query ops: %v", err)) } @@ -52,6 +45,16 @@ func (ss *MediorumServer) serveCrudSweep(c echo.Context) error { filteredOps = append(filteredOps, op) } + // Retention-gap signal: when the caller's cursor falls below our + // retention floor, expose the lowest available ULID so the client + // can advance its cursor explicitly rather than silently skip the + // gap. The body remains a normal ops array for wire compatibility; + // older clients ignore these headers and behave as before. + if gapMinULID != "" { + c.Response().Header().Set(crudr.HeaderRetentionGap, "true") + c.Response().Header().Set(crudr.HeaderAvailableMin, gapMinULID) + } + c.Response().Header().Set(echo.HeaderCacheControl, "public, max-age=300") return c.JSON(200, filteredOps) } diff --git a/pkg/mediorum/server/server.go b/pkg/mediorum/server/server.go index 40be802c..170b839c 100644 --- a/pkg/mediorum/server/server.go +++ b/pkg/mediorum/server/server.go @@ -78,12 +78,12 @@ type MediorumConfig struct { DiscoveryListensEndpoints []string LogLevel string DeadHosts []string - RepairEnabled bool `default:"true"` - RepairInterval time.Duration `default:"1h"` - RepairConcurrency int `default:"1"` + RepairEnabled bool `default:"true"` + RepairInterval time.Duration `default:"1h"` + RepairConcurrency int `default:"1"` ProgrammableDistributionEnabled bool - BlobStorageStreaming bool + BlobStorageStreaming bool // should have a basedir type of thing // by default will put db + blobs there @@ -91,7 +91,6 @@ type MediorumConfig struct { privateKey *ecdsa.PrivateKey } - type MediorumServer struct { lc *lifecycle.Lifecycle echo *echo.Echo @@ -152,6 +151,7 @@ type MediorumServer struct { Config MediorumConfig crudSweepMutex sync.Mutex + retentionCfg crudr.RetentionConfig // handle communication between core and mediorum for Proof of Storage posChannel chan pos.PoSRequest @@ -322,12 +322,19 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos Timeout: 3 * time.Minute, // covers blob replication and pull } - // crud + // crud. Normalize self-host with the same rules crudr.New applies to + // peer hosts (lowercase + trailing-slash strip) so a trailing-slash + // or mixed-case mismatch between config.Self.Host and the chain + // registry cannot land self in peerHosts. Without this normalization + // a future config-builder drift would let self be flagged as an + // active peer with no cursor row, permanently blocking retention via + // computeRetentionCutoff's empty-peer sentinel. + normalizedSelf := audiusHttputil.RemoveTrailingSlash(strings.ToLower(config.Self.Host)) peerHosts := []string{} allHosts := []string{} for _, peer := range config.Peers { allHosts = append(allHosts, peer.Host) - if peer.Host != config.Self.Host { + if audiusHttputil.RemoveTrailingSlash(strings.ToLower(peer.Host)) != normalizedSelf { peerHosts = append(peerHosts, peer.Host) } } @@ -335,6 +342,10 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos crud := crudr.New(config.Self.Host, config.privateKey, peerHosts, db, mediorumLifecycle, logger, peerHTTPClient) dbMigrate(crud, config.Self.Host) + // Load retention config once so the dormant cleanup and the + // ongoing retention sweep see a stable view of env vars. + retentionCfg := crudr.LoadRetentionConfig() + deadHosts := config.DeadHosts if deadHosts == nil { deadHosts = []string{} @@ -391,6 +402,7 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos transcodeWork: make(chan *Upload, 100), replicationWork: make(chan *Upload, 100), posChannel: posChannel, + retentionCfg: retentionCfg, peerHealths: map[string]*PeerHealth{}, redirectCache: imcache.New(imcache.WithMaxEntriesLimitOption[string, string](50_000, imcache.EvictionPolicyLRU)), @@ -402,8 +414,8 @@ func New(lc *lifecycle.Lifecycle, logger *zap.Logger, config MediorumConfig, pos bgPullBackoff: imcache.New(imcache.WithMaxEntriesLimitOption[string, struct{}](50_000, imcache.EvictionPolicyLRU), imcache.WithDefaultExpirationOption[string, struct{}](time.Hour)), replicationAttempts: imcache.New(imcache.WithMaxEntriesLimitOption[string, struct{}](50_000, imcache.EvictionPolicyLRU), imcache.WithDefaultExpirationOption[string, struct{}](time.Hour)), - StartedAt: time.Now().UTC(), - Config: config, + StartedAt: time.Now().UTC(), + Config: config, geoIPdbReady: make(chan struct{}), core: core, @@ -615,6 +627,70 @@ func (ss *MediorumServer) MustStart() error { if ss.Config.WalletIsRegistered { ss.crud.StartClients() + if ss.retentionCfg.EnsureOpsTableIndex { + // Composite-index build runs OFF the boot path. On val001-scale + // (84 GiB heap, ~250M rows) CREATE INDEX CONCURRENTLY can take + // 30-60 min; we cannot block /health-check that long. A pinned + // *sql.Conn carries the session-scoped advisory lock + the + // invalid-leftover probe + the create through one backend + // session. Without the pin, the lock leaks across the gorm + // pool and a partial CONCURRENTLY build leaves an INVALID + // index that IF NOT EXISTS short-circuits on, silently + // downgrading retention to seq-scans. + ss.lc.AddManagedRoutine("crudr ops index ensure", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("ops index ensure panicked; recovered", zap.Any("panic", r)) + } + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + } + if err := crudr.EnsureOpsTableIndex(ctx, ss.crud.DB); err != nil { + ss.logger.Warn("ops index ensure failed; retention paths will seq-scan until next process restart", zap.Error(err)) + } + <-ctx.Done() + return ctx.Err() + }) + } else { + ss.logger.Info("ops table index ensure disabled (set OPENAUDIO_MEDIORUM_ENSURE_OPS_TABLE_INDEX=true to enable)") + } + + // Dormant-table cleanup off the boot path so /health-check is + // reachable while a possibly multi-million-row backlog drains. + // The defer-recover protects the entire mediorum process from + // a panic in CleanupDormantOps (lifecycle.AddManagedRoutine has + // no built-in recover). + ss.lc.AddManagedRoutine("crudr dormant cleanup", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("dormant cleanup panicked; recovered", zap.Any("panic", r)) + } + }() + passCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + if _, err := ss.crud.CleanupDormantOps(passCtx, ss.retentionCfg); err != nil { + ss.logger.Warn("dormant cleanup finished with error", zap.Error(err)) + } + <-ctx.Done() + return ctx.Err() + }) + + // Ongoing retention sweep: opt-in via OPENAUDIO_MEDIORUM_OPS_RETENTION_DAYS. + // When unset, RunRetention blocks on ctx.Done() and never deletes. + // Uses the same RetentionConfig captured at server New() time so the + // dormant cleanup and the ongoing sweep share one snapshot of env. + ss.lc.AddManagedRoutine("crudr ops retention", func(ctx context.Context) error { + defer func() { + if r := recover(); r != nil { + ss.logger.Error("ops retention panicked; recovered", zap.Any("panic", r)) + } + }() + return ss.crud.RunRetention(ctx, ss.retentionCfg) + }) + ss.lc.AddManagedRoutine("health poller", ss.startHealthPoller) ss.lc.AddManagedRoutine("repairer", ss.startRepairer) ss.lc.AddManagedRoutine("qm syncer", ss.startQmSyncer)