Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions packages/distribution-monitor/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,7 @@ export class PostgresObservationStore implements ObservationStore {
await store.db.execute(sql.raw(statement));
}

// The unique index on the materialized view is required for
// REFRESH ... CONCURRENTLY but is not modelled by drizzle's
// `pgMaterializedView`, so `pushSchema` never emits it. Create it
// idempotently — `IF NOT EXISTS` matches by name and skips (no rebuild) when
// it already exists.
await store.db.execute(
sql`CREATE UNIQUE INDEX IF NOT EXISTS latest_observations_monitor_idx ON latest_observations (monitor)`,
);
await ensureLatestObservationsIndex(store.db);

return store;
}
Expand Down Expand Up @@ -126,3 +119,49 @@ export class PostgresObservationStore implements ObservationStore {
await this.db.execute(refreshLatestObservationsViewSql);
}
}

/**
* Create the unique index that `REFRESH ... CONCURRENTLY` requires on the
* `latest_observations` materialized view. drizzle's `pgMaterializedView` does
* not model it, so `pushSchema` never emits it; create it idempotently here —
* `IF NOT EXISTS` matches by name and skips (no rebuild) when it already exists.
*
* The statement takes a SHARE lock on the view, which conflicts with the
* EXCLUSIVE lock held by a `REFRESH ... CONCURRENTLY` running in another
* instance. During a rolling deploy the old and new pods overlap, so that wait
* can exceed `lock_timeout` and raise `55P03` (lock_not_available). Tolerate it:
* whenever the view is being refreshed the index already exists, so a failed
* re-check is harmless — far better than aborting startup and crash-looping the
* monitor. Re-throw anything else.
*/
export async function ensureLatestObservationsIndex(
db: Pick<PostgresJsDatabase, 'execute'>,
): Promise<void> {
try {
await db.execute(
sql`CREATE UNIQUE INDEX IF NOT EXISTS latest_observations_monitor_idx ON latest_observations (monitor)`,
);
} catch (error) {
if (!isLockNotAvailable(error)) {
throw error;
}
}
}

/**
* Whether an error is (or wraps) the PostgreSQL `lock_not_available` (`55P03`)
* SQLSTATE, raised when a statement exceeds `lock_timeout` waiting for a
* contended lock. drizzle wraps the driver error, so the SQLSTATE lives on a
* nested `cause` rather than the top-level error.
*/
export function isLockNotAvailable(error: unknown): boolean {
if (typeof error !== 'object' || error === null) {
return false;
}
if ('code' in error && (error as { code?: unknown }).code === '55P03') {
return true;
}
return (
'cause' in error && isLockNotAvailable((error as { cause?: unknown }).cause)
);
}
64 changes: 63 additions & 1 deletion packages/distribution-monitor/test/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,75 @@ import {
} from '@testcontainers/postgresql';
import { sql } from 'drizzle-orm';
import type { PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { PostgresObservationStore } from '../src/store.js';
import {
PostgresObservationStore,
ensureLatestObservationsIndex,
isLockNotAvailable,
} from '../src/store.js';

describe('PostgresObservationStore', () => {
it('exports create factory method', () => {
expect(PostgresObservationStore.create).toBeInstanceOf(Function);
});

describe('ensureLatestObservationsIndex', () => {
const rejectingDb = (error: unknown) =>
({ execute: () => Promise.reject(error) }) as never;

it('swallows a 55P03 lock-timeout so startup is not aborted', async () => {
const lockTimeout = {
name: 'DrizzleQueryError',
cause: { code: '55P03' },
};
await expect(
ensureLatestObservationsIndex(rejectingDb(lockTimeout)),
).resolves.toBeUndefined();
});

it('re-throws any other error', async () => {
const otherError = { cause: { code: '42P07' } };
await expect(
ensureLatestObservationsIndex(rejectingDb(otherError)),
).rejects.toBe(otherError);
});

it('resolves when the index is created', async () => {
const db = { execute: () => Promise.resolve([]) } as never;
await expect(ensureLatestObservationsIndex(db)).resolves.toBeUndefined();
});
});

describe('isLockNotAvailable', () => {
it('detects the 55P03 SQLSTATE on the top-level error', () => {
expect(isLockNotAvailable({ code: '55P03' })).toBe(true);
});

it('detects 55P03 wrapped in a nested cause, as drizzle reports it', () => {
const error = { name: 'DrizzleQueryError', cause: { code: '55P03' } };
expect(isLockNotAvailable(error)).toBe(true);
});

it('walks multiple cause levels', () => {
const error = { cause: { cause: { code: '55P03' } } };
expect(isLockNotAvailable(error)).toBe(true);
});

it('returns false for a different SQLSTATE', () => {
// 42P07 = duplicate_table, already handled by IF NOT EXISTS.
expect(isLockNotAvailable({ cause: { code: '42P07' } })).toBe(false);
});

it('returns false for an error without a code or cause', () => {
expect(isLockNotAvailable(new Error('boom'))).toBe(false);
});

it('returns false for non-object values', () => {
expect(isLockNotAvailable(null)).toBe(false);
expect(isLockNotAvailable(undefined)).toBe(false);
expect(isLockNotAvailable('55P03')).toBe(false);
});
});

describe('integration', () => {
let container: StartedPostgreSqlContainer;
let store: PostgresObservationStore;
Expand Down
8 changes: 4 additions & 4 deletions packages/distribution-monitor/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ export default mergeConfig(
exclude: ['src/cli.ts', 'drizzle.config.ts'],
thresholds: {
autoUpdate: true,
functions: 96.29,
lines: 94.18,
branches: 78.26,
statements: 92.7,
functions: 96.66,
lines: 95.65,
branches: 84.61,
statements: 94.94,
},
},
},
Expand Down