From 76a3b76cfd84d1704c5353b66c0dce9d55bfc579 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Tue, 28 Apr 2026 09:12:57 +0100 Subject: [PATCH 1/5] feat: add retrieval_type column to retrieval_checks clickhouse table --- apps/backend/src/clickhouse/clickhouse.schema.ts | 14 ++++++++++++-- apps/backend/src/retrieval/retrieval.service.ts | 1 + 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index 897d8c92..754eb162 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -1,6 +1,11 @@ /** - * ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS. - * Order matters: database must be created before tables. + * ClickHouse DDL statements executed on startup. + * + * Each statement must be idempotent: + * - CREATE TABLE uses IF NOT EXISTS for the initial schema + * - ALTER TABLE ADD COLUMN uses IF NOT EXISTS for incremental schema changes + * + * Order matters: tables must exist before their ALTER TABLE statements run. */ export function buildMigrations(database: string): string[] { return [ @@ -78,5 +83,10 @@ export function buildMigrations(database: string): string[] { PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + // Schema migrations - each must be idempotent. + // Existing rows get the column's default value (empty string for LowCardinality(String)). + + `ALTER TABLE ${database}.retrieval_checks ADD COLUMN IF NOT EXISTS retrieval_type LowCardinality(String)`, ]; } diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index aff8849d..60598180 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -268,6 +268,7 @@ export class RetrievalService { deal_id: deal.id, retrieval_id: saved.id, service_type: saved.serviceType, + retrieval_type: "deal", status: saved.status, http_response_code: executionResult.metrics.statusCode || null, first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null, From eeab83363288efcf11e34f61a5bc900e8d9e1720 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Tue, 28 Apr 2026 09:34:34 +0100 Subject: [PATCH 2/5] feat: add schema migration system for clickhouse --- ...use.schema.ts => clickhouse.migrations.ts} | 48 ++++++++------ .../src/clickhouse/clickhouse.service.ts | 63 ++++++++++++++++--- 2 files changed, 85 insertions(+), 26 deletions(-) rename apps/backend/src/clickhouse/{clickhouse.schema.ts => clickhouse.migrations.ts} (81%) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.migrations.ts similarity index 81% rename from apps/backend/src/clickhouse/clickhouse.schema.ts rename to apps/backend/src/clickhouse/clickhouse.migrations.ts index 754eb162..f957dea1 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.migrations.ts @@ -1,15 +1,19 @@ -/** - * ClickHouse DDL statements executed on startup. - * - * Each statement must be idempotent: - * - CREATE TABLE uses IF NOT EXISTS for the initial schema - * - ALTER TABLE ADD COLUMN uses IF NOT EXISTS for incremental schema changes - * - * Order matters: tables must exist before their ALTER TABLE statements run. - */ -export function buildMigrations(database: string): string[] { +export interface Migration { + version: number; + name: string; + up: string[]; + down: string[]; +} + +export function getMigrations(database: string): Migration[] { return [ - `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks + { + version: 1, + name: "initial_schema", + // IF NOT EXISTS guards allow this to be re-run against a database that was created + // before the migration system was introduced. + up: [ + `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks ( timestamp DateTime64(3, 'UTC'), -- when deal entity was saved @@ -44,7 +48,7 @@ export function buildMigrations(database: string): string[] { PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, - `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks + `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks ( timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved probe_location LowCardinality(String), -- dealbot location @@ -67,7 +71,7 @@ export function buildMigrations(database: string): string[] { PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, - `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges + `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges ( timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods probe_location LowCardinality(String), -- dealbot location @@ -83,10 +87,18 @@ export function buildMigrations(database: string): string[] { PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, - - // Schema migrations - each must be idempotent. - // Existing rows get the column's default value (empty string for LowCardinality(String)). - - `ALTER TABLE ${database}.retrieval_checks ADD COLUMN IF NOT EXISTS retrieval_type LowCardinality(String)`, + ], + down: [ + `DROP TABLE IF EXISTS ${database}.data_retention_challenges`, + `DROP TABLE IF EXISTS ${database}.retrieval_checks`, + `DROP TABLE IF EXISTS ${database}.data_storage_checks`, + ], + }, + { + version: 2, + name: "add_retrieval_type_to_retrieval_checks", + up: [`ALTER TABLE ${database}.retrieval_checks ADD COLUMN IF NOT EXISTS retrieval_type LowCardinality(String)`], + down: [`ALTER TABLE ${database}.retrieval_checks DROP COLUMN IF EXISTS retrieval_type`], + }, ]; } diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 2cdfc34a..532c3f91 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -4,7 +4,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; import type { IClickhouseConfig, IConfig } from "../config/app.config.js"; -import { buildMigrations } from "./clickhouse.schema.js"; +import { getMigrations } from "./clickhouse.migrations.js"; interface BufferedRow { table: string; @@ -16,6 +16,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private readonly logger = new Logger(ClickhouseService.name); private readonly config: IClickhouseConfig; private client: ClickHouseClient | null = null; + private database = ""; private buffer: BufferedRow[] = []; private flushTimer: NodeJS.Timeout | null = null; @@ -41,11 +42,11 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { }); const parsedUrl = new URL(this.config.url); - const database = parsedUrl.pathname.replace(/^\//, ""); + this.database = parsedUrl.pathname.replace(/^\//, ""); try { - await this.migrate(database); + await this.migrate(this.database); } catch (err) { - this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) }); + this.logger.error({ event: "clickhouse_migration_failed", database: this.database, error: String(err) }); throw err; } @@ -58,7 +59,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.logger.log({ event: "clickhouse_initialized", host: parsedUrl.host, - database, + database: this.database, batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, probeLocation: this.configService.get("app").probeLocation, @@ -67,11 +68,57 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private async migrate(database: string): Promise { if (!this.client) return; - const migrations = buildMigrations(database); - for (const sql of migrations) { + + await this.client.command({ + query: `CREATE TABLE IF NOT EXISTS ${database}.schema_migrations +( + version UInt32, + name String, + applied_at DateTime64(3, 'UTC') DEFAULT now64() +) +ENGINE = MergeTree() +ORDER BY version`, + }); + + const result = await this.client.query({ + query: `SELECT version FROM ${database}.schema_migrations ORDER BY version`, + format: "JSONEachRow", + }); + const rows = (await result.json()) as { version: number }[]; + const applied = new Set(rows.map((r) => r.version)); + + const migrations = getMigrations(database); + let count = 0; + for (const m of migrations) { + if (applied.has(m.version)) continue; + for (const sql of m.up) { + await this.client.command({ query: sql }); + } + await this.client.insert({ + table: `${database}.schema_migrations`, + values: [{ version: m.version, name: m.name }], + format: "JSONEachRow", + }); + this.logger.log({ event: "migration_applied", version: m.version, name: m.name }); + count++; + } + + this.logger.log({ event: "clickhouse_migrated", database, appliedCount: count }); + } + + async migrateDown(version: number): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + const migrations = getMigrations(this.database); + const migration = migrations.find((m) => m.version === version); + if (!migration) throw new Error(`Migration version ${version} not found`); + + for (const sql of migration.down) { await this.client.command({ query: sql }); } - this.logger.log({ event: "clickhouse_migrated", database }); + await this.client.command({ + query: `ALTER TABLE ${this.database}.schema_migrations DELETE WHERE version = ${version}`, + }); + this.logger.log({ event: "migration_rolled_back", version, name: migration.name }); } async onApplicationShutdown() { From 7f06732dcb8266bff15113566098c7558b4a7d53 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:27:49 +0100 Subject: [PATCH 3/5] fix: take a global lock while running migrations --- .../src/clickhouse/clickhouse.service.ts | 94 ++++++++++++++----- 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 532c3f91..7a947de5 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -42,7 +42,13 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { }); const parsedUrl = new URL(this.config.url); - this.database = parsedUrl.pathname.replace(/^\//, ""); + const dbName = parsedUrl.pathname.replace(/^\/+|\/+$/g, "").split("/")[0]; + if (!dbName || !/^[a-zA-Z][a-zA-Z0-9_]*$/.test(dbName)) { + throw new Error( + `CLICKHOUSE_URL database name "${dbName}" is invalid - must start with a letter and contain only letters, digits, and underscores, e.g. http://host:8123/dealbot`, + ); + } + this.database = dbName; try { await this.migrate(this.database); } catch (err) { @@ -80,30 +86,73 @@ ENGINE = MergeTree() ORDER BY version`, }); - const result = await this.client.query({ - query: `SELECT version FROM ${database}.schema_migrations ORDER BY version`, - format: "JSONEachRow", - }); - const rows = (await result.json()) as { version: number }[]; - const applied = new Set(rows.map((r) => r.version)); - - const migrations = getMigrations(database); - let count = 0; - for (const m of migrations) { - if (applied.has(m.version)) continue; - for (const sql of m.up) { - await this.client.command({ query: sql }); - } - await this.client.insert({ - table: `${database}.schema_migrations`, - values: [{ version: m.version, name: m.name }], + const lockAcquired = await this.tryAcquireMigrationLock(database); + if (!lockAcquired) { + this.logger.error({ event: "migration_locked", message: "Another instance is running migrations" }); + throw new Error("Migration lock is held by another instance"); + } + + try { + const result = await this.client.query({ + query: `SELECT version FROM ${database}.schema_migrations ORDER BY version`, format: "JSONEachRow", }); - this.logger.log({ event: "migration_applied", version: m.version, name: m.name }); - count++; + const rows = (await result.json()) as { version: number }[]; + const applied = new Set(rows.map((r) => r.version)); + + const migrations = getMigrations(database); + let count = 0; + let schemaVersion = applied.size > 0 ? Math.max(...applied) : 0; + for (const m of migrations) { + if (applied.has(m.version)) continue; + for (const sql of m.up) { + await this.client.command({ query: sql }); + } + await this.client.insert({ + table: `${database}.schema_migrations`, + values: [{ version: m.version, name: m.name }], + format: "JSONEachRow", + }); + this.logger.log({ event: "migration_applied", version: m.version, name: m.name }); + schemaVersion = m.version; + count++; + } + + this.logger.log({ event: "clickhouse_migrated", database, schemaVersion, appliedCount: count }); + } finally { + await this.releaseMigrationLock(database); } + } - this.logger.log({ event: "clickhouse_migrated", database, appliedCount: count }); + private migrationLockTable(database: string): string { + return `${database}.schema_migration_lock`; + } + + // The lock table is normally dropped in the finally block after migrations complete. + // If the process crashes while holding the lock the table is left behind, and all + // subsequent startups will fail with "Migration lock is held by another instance". + // To recover, drop the table manually: + // DROP TABLE .schema_migration_lock + private async tryAcquireMigrationLock(database: string): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + try { + await this.client.command({ + query: `CREATE TABLE ${this.migrationLockTable(database)} (locked UInt8) ENGINE = TinyLog`, + }); + return true; + } catch (error) { + if (error instanceof Error && /already exists/i.test(error.message)) { + return false; + } + throw error; + } + } + + private async releaseMigrationLock(database: string): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + await this.client.command({ + query: `DROP TABLE IF EXISTS ${this.migrationLockTable(database)}`, + }); } async migrateDown(version: number): Promise { @@ -116,7 +165,8 @@ ORDER BY version`, await this.client.command({ query: sql }); } await this.client.command({ - query: `ALTER TABLE ${this.database}.schema_migrations DELETE WHERE version = ${version}`, + query: `DELETE FROM ${this.database}.schema_migrations WHERE version = {version:UInt32}`, + query_params: { version }, }); this.logger.log({ event: "migration_rolled_back", version, name: migration.name }); } From 7eb3021c489476dedcd36e67e4ae2832f5a38af6 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:48:01 +0100 Subject: [PATCH 4/5] fix: sort list of migrations and improve lock error message --- apps/backend/src/clickhouse/clickhouse.service.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 7a947de5..f4204610 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -88,8 +88,10 @@ ORDER BY version`, const lockAcquired = await this.tryAcquireMigrationLock(database); if (!lockAcquired) { - this.logger.error({ event: "migration_locked", message: "Another instance is running migrations" }); - throw new Error("Migration lock is held by another instance"); + const lockTable = this.migrationLockTable(database); + const message = `Could not acquire migration lock on ${lockTable}. Another instance may be running migrations, or a previous migration process may have crashed and left a stale lock. If no migrations are currently running, drop the lock table and restart: DROP TABLE ${lockTable}`; + this.logger.error({ event: "migration_locked", message, lockTable }); + throw new Error(message); } try { @@ -100,7 +102,7 @@ ORDER BY version`, const rows = (await result.json()) as { version: number }[]; const applied = new Set(rows.map((r) => r.version)); - const migrations = getMigrations(database); + const migrations = getMigrations(database).sort((a, b) => a.version - b.version); let count = 0; let schemaVersion = applied.size > 0 ? Math.max(...applied) : 0; for (const m of migrations) { From 08972c6ed31c2e4d4e5dbf5d0c6ce788306d1c06 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Tue, 28 Apr 2026 11:04:29 +0100 Subject: [PATCH 5/5] fix: wrap migrateDown in global lock --- .../src/clickhouse/clickhouse.service.ts | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index f4204610..aa2d53c9 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -163,14 +163,24 @@ ORDER BY version`, const migration = migrations.find((m) => m.version === version); if (!migration) throw new Error(`Migration version ${version} not found`); - for (const sql of migration.down) { - await this.client.command({ query: sql }); + const lockAcquired = await this.tryAcquireMigrationLock(this.database); + if (!lockAcquired) { + const lockTable = this.migrationLockTable(this.database); + throw new Error(`Could not acquire migration lock on ${lockTable}. Another migration operation is in progress.`); + } + + try { + for (const sql of migration.down) { + await this.client.command({ query: sql }); + } + await this.client.command({ + query: `DELETE FROM ${this.database}.schema_migrations WHERE version = {version:UInt32}`, + query_params: { version }, + }); + this.logger.log({ event: "migration_rolled_back", version, name: migration.name }); + } finally { + await this.releaseMigrationLock(this.database); } - await this.client.command({ - query: `DELETE FROM ${this.database}.schema_migrations WHERE version = {version:UInt32}`, - query_params: { version }, - }); - this.logger.log({ event: "migration_rolled_back", version, name: migration.name }); } async onApplicationShutdown() {