diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.migrations.ts similarity index 80% rename from apps/backend/src/clickhouse/clickhouse.schema.ts rename to apps/backend/src/clickhouse/clickhouse.migrations.ts index 897d8c92..f957dea1 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.migrations.ts @@ -1,10 +1,19 @@ -/** - * ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS. - * Order matters: database must be created before tables. - */ -export function buildMigrations(database: string): string[] { +export interface Migration { + version: number; + name: string; + up: string[]; + down: string[]; +} + +export function getMigrations(database: string): Migration[] { return [ - `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks + { + version: 1, + name: "initial_schema", + // IF NOT EXISTS guards allow this to be re-run against a database that was created + // before the migration system was introduced. + up: [ + `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks ( timestamp DateTime64(3, 'UTC'), -- when deal entity was saved @@ -39,7 +48,7 @@ export function buildMigrations(database: string): string[] { PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, - `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks + `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks ( timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved probe_location LowCardinality(String), -- dealbot location @@ -62,7 +71,7 @@ export function buildMigrations(database: string): string[] { PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, - `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges + `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges ( timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods probe_location LowCardinality(String), -- dealbot location @@ -78,5 +87,18 @@ export function buildMigrations(database: string): string[] { PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + ], + down: [ + `DROP TABLE IF EXISTS ${database}.data_retention_challenges`, + `DROP TABLE IF EXISTS ${database}.retrieval_checks`, + `DROP TABLE IF EXISTS ${database}.data_storage_checks`, + ], + }, + { + version: 2, + name: "add_retrieval_type_to_retrieval_checks", + up: [`ALTER TABLE ${database}.retrieval_checks ADD COLUMN IF NOT EXISTS retrieval_type LowCardinality(String)`], + down: [`ALTER TABLE ${database}.retrieval_checks DROP COLUMN IF EXISTS retrieval_type`], + }, ]; } diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 2cdfc34a..aa2d53c9 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -4,7 +4,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; import type { IClickhouseConfig, IConfig } from "../config/app.config.js"; -import { buildMigrations } from "./clickhouse.schema.js"; +import { getMigrations } from "./clickhouse.migrations.js"; interface BufferedRow { table: string; @@ -16,6 +16,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private readonly logger = new Logger(ClickhouseService.name); private readonly config: IClickhouseConfig; private client: ClickHouseClient | null = null; + private database = ""; private buffer: BufferedRow[] = []; private flushTimer: NodeJS.Timeout | null = null; @@ -41,11 +42,17 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { }); const parsedUrl = new URL(this.config.url); - const database = parsedUrl.pathname.replace(/^\//, ""); + const dbName = parsedUrl.pathname.replace(/^\/+|\/+$/g, "").split("/")[0]; + if (!dbName || !/^[a-zA-Z][a-zA-Z0-9_]*$/.test(dbName)) { + throw new Error( + `CLICKHOUSE_URL database name "${dbName}" is invalid - must start with a letter and contain only letters, digits, and underscores, e.g. http://host:8123/dealbot`, + ); + } + this.database = dbName; try { - await this.migrate(database); + await this.migrate(this.database); } catch (err) { - this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) }); + this.logger.error({ event: "clickhouse_migration_failed", database: this.database, error: String(err) }); throw err; } @@ -58,7 +65,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.logger.log({ event: "clickhouse_initialized", host: parsedUrl.host, - database, + database: this.database, batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, probeLocation: this.configService.get("app").probeLocation, @@ -67,11 +74,113 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private async migrate(database: string): Promise { if (!this.client) return; - const migrations = buildMigrations(database); - for (const sql of migrations) { - await this.client.command({ query: sql }); + + await this.client.command({ + query: `CREATE TABLE IF NOT EXISTS ${database}.schema_migrations +( + version UInt32, + name String, + applied_at DateTime64(3, 'UTC') DEFAULT now64() +) +ENGINE = MergeTree() +ORDER BY version`, + }); + + const lockAcquired = await this.tryAcquireMigrationLock(database); + if (!lockAcquired) { + const lockTable = this.migrationLockTable(database); + const message = `Could not acquire migration lock on ${lockTable}. Another instance may be running migrations, or a previous migration process may have crashed and left a stale lock. If no migrations are currently running, drop the lock table and restart: DROP TABLE ${lockTable}`; + this.logger.error({ event: "migration_locked", message, lockTable }); + throw new Error(message); + } + + try { + const result = await this.client.query({ + query: `SELECT version FROM ${database}.schema_migrations ORDER BY version`, + format: "JSONEachRow", + }); + const rows = (await result.json()) as { version: number }[]; + const applied = new Set(rows.map((r) => r.version)); + + const migrations = getMigrations(database).sort((a, b) => a.version - b.version); + let count = 0; + let schemaVersion = applied.size > 0 ? Math.max(...applied) : 0; + for (const m of migrations) { + if (applied.has(m.version)) continue; + for (const sql of m.up) { + await this.client.command({ query: sql }); + } + await this.client.insert({ + table: `${database}.schema_migrations`, + values: [{ version: m.version, name: m.name }], + format: "JSONEachRow", + }); + this.logger.log({ event: "migration_applied", version: m.version, name: m.name }); + schemaVersion = m.version; + count++; + } + + this.logger.log({ event: "clickhouse_migrated", database, schemaVersion, appliedCount: count }); + } finally { + await this.releaseMigrationLock(database); + } + } + + private migrationLockTable(database: string): string { + return `${database}.schema_migration_lock`; + } + + // The lock table is normally dropped in the finally block after migrations complete. + // If the process crashes while holding the lock the table is left behind, and all + // subsequent startups will fail with "Migration lock is held by another instance". + // To recover, drop the table manually: + // DROP TABLE .schema_migration_lock + private async tryAcquireMigrationLock(database: string): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + try { + await this.client.command({ + query: `CREATE TABLE ${this.migrationLockTable(database)} (locked UInt8) ENGINE = TinyLog`, + }); + return true; + } catch (error) { + if (error instanceof Error && /already exists/i.test(error.message)) { + return false; + } + throw error; + } + } + + private async releaseMigrationLock(database: string): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + await this.client.command({ + query: `DROP TABLE IF EXISTS ${this.migrationLockTable(database)}`, + }); + } + + async migrateDown(version: number): Promise { + if (!this.client) throw new Error("ClickHouse not connected"); + const migrations = getMigrations(this.database); + const migration = migrations.find((m) => m.version === version); + if (!migration) throw new Error(`Migration version ${version} not found`); + + const lockAcquired = await this.tryAcquireMigrationLock(this.database); + if (!lockAcquired) { + const lockTable = this.migrationLockTable(this.database); + throw new Error(`Could not acquire migration lock on ${lockTable}. Another migration operation is in progress.`); + } + + try { + for (const sql of migration.down) { + await this.client.command({ query: sql }); + } + await this.client.command({ + query: `DELETE FROM ${this.database}.schema_migrations WHERE version = {version:UInt32}`, + query_params: { version }, + }); + this.logger.log({ event: "migration_rolled_back", version, name: migration.name }); + } finally { + await this.releaseMigrationLock(this.database); } - this.logger.log({ event: "clickhouse_migrated", database }); } async onApplicationShutdown() { diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index aff8849d..60598180 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -268,6 +268,7 @@ export class RetrievalService { deal_id: deal.id, retrieval_id: saved.id, service_type: saved.serviceType, + retrieval_type: "deal", status: saved.status, http_response_code: executionResult.metrics.statusCode || null, first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null,