Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions pkg/mediorum/crudr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (p *PeerClient) doSweep(ctx context.Context) error {
host := p.Host
bulkEndpoint := "/internal/crud/sweep" // hardcoded

// get cursor
// get cursor. Treat an implausible persisted LastULID (e.g., a
// pre-PR row with a far-future or epoch ULID) as missing. Without
// this guard, doSweep would echo the bad value as `after=` and
// re-upsert the same bad value at end-of-sweep, locking the peer
// into a permanent prune-recreate cycle that no other path heals.
lastUlid := ""
{
var cursor Cursor
Expand All @@ -138,6 +142,10 @@ func (p *PeerClient) doSweep(ctx context.Context) error {
if !errors.Is(err, gorm.ErrRecordNotFound) {
p.logger.Warn("failed to get cursor", "err", err)
}
} else if cursor.LastULID != "" && !isValidPeerSuppliedULID(cursor.LastULID) {
p.logger.Warn("persisted cursor LastULID is implausible; treating as missing",
"peer", host,
"last_ulid", cursor.LastULID)
} else {
lastUlid = cursor.LastULID
}
Expand All @@ -164,6 +172,42 @@ func (p *PeerClient) doSweep(ctx context.Context) error {
return fmt.Errorf("bad status: %d", resp.StatusCode)
}

// Retention-gap signal: the peer has dropped ops below our cursor and
// is advertising the lowest ulid it still has. Stage a cursor advance
// across the gap. Without this branch, the response body would be
// applied normally and the cursor would silently skip the gap.
//
// First-contact guard: if we have no durable cursor (`lastUlid == ""`),
// the empty-string < anyULID comparison would let a peer establish
// our initial cursor at an attacker-chosen position. Require an
// existing cursor before honoring the gap.
//
// Defer-until-decode: stage the advance in lastUlid here but do NOT
// persist. The end-of-sweep upsert is the single persistence point.
// On body-decode failure or apply error we abort without writing the
// forward cursor, so the next sweep retries the same range cleanly.
gapMinULID := resp.Header.Get(HeaderAvailableMin)
gapAdvancePending := false
if resp.Header.Get(HeaderRetentionGap) == "true" && gapMinULID != "" {
switch {
case lastUlid == "":
p.logger.Warn("ignoring retention-gap header on first contact (no durable cursor yet)",
"peer", host,
"advertised_ulid", gapMinULID)
case !isValidPeerSuppliedULID(gapMinULID):
p.logger.Warn("ignoring retention-gap header with invalid ulid",
"peer", host,
"advertised_ulid", gapMinULID)
case gapMinULID > lastUlid:
p.logger.Warn("retention gap detected: peer cursor below peer's available history; staging cursor advance across gap",
"peer", host,
"local_cursor", lastUlid,
"peer_available_min_ulid", gapMinULID)
lastUlid = gapMinULID
gapAdvancePending = true
}
}

var ops []*Op
dec := json.NewDecoder(resp.Body)
err = dec.Decode(&ops)
Expand Down Expand Up @@ -197,12 +241,21 @@ func (p *PeerClient) doSweep(ctx context.Context) error {
p.Seeded = true
}

// set cursor
{
// set cursor. Single persistence point for both normal advance and
// gap-staged advance. MarkSweepGapAdvance fires only after the
// upsert succeeds AND lastUlid is still at-or-above the staged gap
// floor. Body ops that regress lastUlid below the gap suppress
// the counter so it never over-reports relative to durable state.
// Skip the upsert when lastUlid stays empty so a first-contact
// rejection or no-ops response doesn't write an empty cursor row
// that would later block all retention via the empty-peer sentinel.
if lastUlid != "" {
upsertClause := clause.OnConflict{UpdateAll: true}
err := p.crudr.DB.Clauses(upsertClause).Create(&Cursor{Host: host, LastULID: lastUlid}).Error
if err != nil {
p.logger.Error("failed to set cursor", "err", err)
} else if gapAdvancePending && lastUlid >= gapMinULID {
MarkSweepGapAdvance()
}
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/mediorum/crudr/crudr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"strings"
"sync"
"time"

"github.com/OpenAudio/go-openaudio/pkg/httputil"
"github.com/OpenAudio/go-openaudio/pkg/lifecycle"
Expand All @@ -20,6 +21,80 @@ import (
"gorm.io/gorm/clause"
)

// opsTableIndexLockKey is the pg session-scoped advisory-lock key for
// the composite ops("table", ulid) index migration. ASCII-packed
// "ops_indx" so a future maintainer adding adjacent locks can spot
// the convention.
const opsTableIndexLockKey int64 = 0x6F70735F696E6478

// EnsureOpsTableIndex creates the composite index ops("table", ulid)
// used by the dormant cleanup and (when enabled) the per-table
// retention queries. Designed to run OFF the boot path via a managed
// routine so the multi-hour CONCURRENTLY build on val001-scale tables
// cannot block /health-check.
//
// The advisory lock is acquired on a pinned *sql.Conn so all five
// steps (acquire, probe-invalid, optional DROP, CREATE, unlock) share
// one Postgres backend session. Without the pin, gorm's pool checks
// out a different conn per call and the session-scoped lock leaks
// off the original session. Two processes could each "acquire" the
// lock on different pool conns and race the CONCURRENTLY build.
//
// The invalid-leftover probe + DROP handles the recovery case where
// a prior process died mid-CONCURRENTLY; without it, the next boot's
// CREATE IF NOT EXISTS short-circuits on the INVALID index name and
// retention paths silently degrade to seq-scans forever.
func EnsureOpsTableIndex(ctx context.Context, db *gorm.DB) error {
sqlDB, err := db.DB()
if err != nil {
return fmt.Errorf("ops-index ensure: get underlying *sql.DB: %w", err)
}
conn, err := sqlDB.Conn(ctx)
if err != nil {
return fmt.Errorf("ops-index ensure: pin connection: %w", err)
}
defer conn.Close()

var locked bool
if err := conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, opsTableIndexLockKey).Scan(&locked); err != nil {
return fmt.Errorf("ops-index ensure: acquire advisory lock: %w", err)
}
if !locked {
return nil
}
defer func() {
// Detached short-context: a shutdown signal during the build
// shouldn't strand the lock. conn.Close also releases
// session-scoped locks as belt-and-suspenders.
unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, _ = conn.ExecContext(unlockCtx, `SELECT pg_advisory_unlock($1)`, opsTableIndexLockKey)
}()

var invalid bool
if err := conn.QueryRowContext(ctx, `
SELECT EXISTS (
SELECT 1
FROM pg_index i
JOIN pg_class c ON c.oid = i.indexrelid
WHERE c.relname = 'idx_ops_table_ulid'
AND NOT i.indisvalid
)
`).Scan(&invalid); err != nil {
return fmt.Errorf("ops-index ensure: probe invalid index: %w", err)
}
if invalid {
if _, err := conn.ExecContext(ctx, `DROP INDEX CONCURRENTLY IF EXISTS idx_ops_table_ulid`); err != nil {
return fmt.Errorf("ops-index ensure: drop invalid index: %w", err)
}
}

if _, err := conn.ExecContext(ctx, `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ops_table_ulid ON ops ("table", ulid)`); err != nil {
return fmt.Errorf("ops-index ensure: create index: %w", err)
}
return nil
}

const (
ActionCreate = "create"
ActionUpdate = "update"
Expand Down Expand Up @@ -257,6 +332,17 @@ func (c *Crudr) KnownType(op *Op) bool {
}

func (c *Crudr) ApplyOp(op *Op) error {
// Reject implausible ULIDs from any source (peer push, sweep response,
// or local code). Local code constructs via ulid.Make() which always
// passes; the check is essentially free on the local path. A hostile
// peer can otherwise spoof op.Host = c.host to bypass any host-keyed
// gate AND plant a poisoned ULID (epoch, far-future, malformed) that
// would persist into cursors and break ulidShiftBack on the retention
// floor probe.
if !isValidPeerSuppliedULID(op.ULID) {
return fmt.Errorf("rejecting op with implausible ULID %q from origin %s", op.ULID, op.Host)
}

elemType, ok := c.typeMap[op.Table]
if !ok {
return fmt.Errorf("no type registered for %s", op.Table)
Expand Down
Loading
Loading