Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
/**
* ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS.
* Order matters: database must be created before tables.
*/
export function buildMigrations(database: string): string[] {
export interface Migration {
version: number;
name: string;
up: string[];
down: string[];
}

export function getMigrations(database: string): Migration[] {
return [
`CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks
{
version: 1,
name: "initial_schema",
// IF NOT EXISTS guards allow this to be re-run against a database that was created
// before the migration system was introduced.
up: [
`CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks
(
timestamp DateTime64(3, 'UTC'), -- when deal entity was saved

Expand Down Expand Up @@ -39,7 +48,7 @@ export function buildMigrations(database: string): string[] {
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks
`CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks
(
timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved
probe_location LowCardinality(String), -- dealbot location
Expand All @@ -62,7 +71,7 @@ export function buildMigrations(database: string): string[] {
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges
`CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges
(
timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods
probe_location LowCardinality(String), -- dealbot location
Expand All @@ -78,5 +87,18 @@ export function buildMigrations(database: string): string[] {
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,
],
down: [
`DROP TABLE IF EXISTS ${database}.data_retention_challenges`,
`DROP TABLE IF EXISTS ${database}.retrieval_checks`,
`DROP TABLE IF EXISTS ${database}.data_storage_checks`,
],
},
{
version: 2,
name: "add_retrieval_type_to_retrieval_checks",
up: [`ALTER TABLE ${database}.retrieval_checks ADD COLUMN IF NOT EXISTS retrieval_type LowCardinality(String)`],
down: [`ALTER TABLE ${database}.retrieval_checks DROP COLUMN IF EXISTS retrieval_type`],
},
];
}
127 changes: 118 additions & 9 deletions apps/backend/src/clickhouse/clickhouse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ConfigService } from "@nestjs/config";
import { InjectMetric } from "@willsoto/nestjs-prometheus";
import { Counter, Gauge, Histogram } from "prom-client";
import type { IClickhouseConfig, IConfig } from "../config/app.config.js";
import { buildMigrations } from "./clickhouse.schema.js";
import { getMigrations } from "./clickhouse.migrations.js";

interface BufferedRow {
table: string;
Expand All @@ -16,6 +16,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown {
private readonly logger = new Logger(ClickhouseService.name);
private readonly config: IClickhouseConfig;
private client: ClickHouseClient | null = null;
private database = "";
private buffer: BufferedRow[] = [];
private flushTimer: NodeJS.Timeout | null = null;

Expand All @@ -41,11 +42,17 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown {
});

const parsedUrl = new URL(this.config.url);
const database = parsedUrl.pathname.replace(/^\//, "");
const dbName = parsedUrl.pathname.replace(/^\/+|\/+$/g, "").split("/")[0];
if (!dbName || !/^[a-zA-Z][a-zA-Z0-9_]*$/.test(dbName)) {
throw new Error(
`CLICKHOUSE_URL database name "${dbName}" is invalid - must start with a letter and contain only letters, digits, and underscores, e.g. http://host:8123/dealbot`,
);
}
this.database = dbName;
try {
await this.migrate(database);
await this.migrate(this.database);
Comment on lines 44 to +53
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is valid and tightens the code against deployment errors

} catch (err) {
this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) });
this.logger.error({ event: "clickhouse_migration_failed", database: this.database, error: String(err) });
throw err;
}

Expand All @@ -58,7 +65,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown {
this.logger.log({
event: "clickhouse_initialized",
host: parsedUrl.host,
database,
database: this.database,
batchSize: this.config.batchSize,
flushIntervalMs: this.config.flushIntervalMs,
probeLocation: this.configService.get("app").probeLocation,
Expand All @@ -67,11 +74,113 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown {

private async migrate(database: string): Promise<void> {
if (!this.client) return;
const migrations = buildMigrations(database);
for (const sql of migrations) {
await this.client.command({ query: sql });

await this.client.command({
query: `CREATE TABLE IF NOT EXISTS ${database}.schema_migrations
(
version UInt32,
Comment on lines +78 to +81
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some stricter validation on the CLICKHOUSE_URL

name String,
applied_at DateTime64(3, 'UTC') DEFAULT now64()
)
ENGINE = MergeTree()
ORDER BY version`,
});
Comment on lines +78 to +87
Copy link
Copy Markdown
Contributor Author

@iand iand Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem worth the effort. The database is a required precondition for dealbot when DEALBASE_URL is set, and the first migration error will cause dealbot to refuse to start if the database is not accessible on startup

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as agreed previously, since probelab is creating the clickhouse DB, i'm good with your reasoning here.


const lockAcquired = await this.tryAcquireMigrationLock(database);
if (!lockAcquired) {
const lockTable = this.migrationLockTable(database);
const message = `Could not acquire migration lock on ${lockTable}. Another instance may be running migrations, or a previous migration process may have crashed and left a stale lock. If no migrations are currently running, drop the lock table and restart: DROP TABLE ${lockTable}`;
this.logger.error({ event: "migration_locked", message, lockTable });
throw new Error(message);
}

try {
const result = await this.client.query({
query: `SELECT version FROM ${database}.schema_migrations ORDER BY version`,
format: "JSONEachRow",
});
const rows = (await result.json()) as { version: number }[];
const applied = new Set(rows.map((r) => r.version));

const migrations = getMigrations(database).sort((a, b) => a.version - b.version);
let count = 0;
let schemaVersion = applied.size > 0 ? Math.max(...applied) : 0;
for (const m of migrations) {
if (applied.has(m.version)) continue;
for (const sql of m.up) {
await this.client.command({ query: sql });
}
await this.client.insert({
table: `${database}.schema_migrations`,
values: [{ version: m.version, name: m.name }],
format: "JSONEachRow",
});
this.logger.log({ event: "migration_applied", version: m.version, name: m.name });
schemaVersion = m.version;
count++;
}

this.logger.log({ event: "clickhouse_migrated", database, schemaVersion, appliedCount: count });
} finally {
await this.releaseMigrationLock(database);
}
}

private migrationLockTable(database: string): string {
return `${database}.schema_migration_lock`;
}

// The lock table is normally dropped in the finally block after migrations complete.
// If the process crashes while holding the lock the table is left behind, and all
// subsequent startups will fail with "Migration lock is held by another instance".
// To recover, drop the table manually:
// DROP TABLE <database>.schema_migration_lock
private async tryAcquireMigrationLock(database: string): Promise<boolean> {
if (!this.client) throw new Error("ClickHouse not connected");
try {
await this.client.command({
query: `CREATE TABLE ${this.migrationLockTable(database)} (locked UInt8) ENGINE = TinyLog`,
});
return true;
} catch (error) {
if (error instanceof Error && /already exists/i.test(error.message)) {
return false;
}
throw error;
}
}

private async releaseMigrationLock(database: string): Promise<void> {
if (!this.client) throw new Error("ClickHouse not connected");
await this.client.command({
query: `DROP TABLE IF EXISTS ${this.migrationLockTable(database)}`,
});
}

async migrateDown(version: number): Promise<void> {
if (!this.client) throw new Error("ClickHouse not connected");
const migrations = getMigrations(this.database);
const migration = migrations.find((m) => m.version === version);
if (!migration) throw new Error(`Migration version ${version} not found`);

const lockAcquired = await this.tryAcquireMigrationLock(this.database);
if (!lockAcquired) {
const lockTable = this.migrationLockTable(this.database);
throw new Error(`Could not acquire migration lock on ${lockTable}. Another migration operation is in progress.`);
}
Comment on lines +160 to +170
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth the effort


try {
for (const sql of migration.down) {
await this.client.command({ query: sql });
}
await this.client.command({
query: `DELETE FROM ${this.database}.schema_migrations WHERE version = {version:UInt32}`,
query_params: { version },
});
this.logger.log({ event: "migration_rolled_back", version, name: migration.name });
} finally {
await this.releaseMigrationLock(this.database);
}
this.logger.log({ event: "clickhouse_migrated", database });
}
Comment on lines +160 to 184
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


async onApplicationShutdown() {
Expand Down
1 change: 1 addition & 0 deletions apps/backend/src/retrieval/retrieval.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ export class RetrievalService {
deal_id: deal.id,
retrieval_id: saved.id,
service_type: saved.serviceType,
retrieval_type: "deal",
status: saved.status,
http_response_code: executionResult.metrics.statusCode || null,
first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null,
Expand Down
Loading