Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ GRAFANA_VALUES ?= $(MONITORING_OVERLAY)/grafana-values.yaml
.PHONY: web-image-build web-kind-load web-logs
.PHONY: image-build kind-load deploy undeploy render logs
.PHONY: redeploy restart restart-backend restart-web restart-worker
.PHONY: clickhouse-reset clickhouse-shell
.PHONY: monitoring-install monitoring-apply monitoring-up monitoring-down
.PHONY: local-up up down

Expand Down Expand Up @@ -119,6 +120,15 @@ redeploy:
$(MAKE) deploy
$(MAKE) restart

# Delete the ClickHouse PVC so initdb scripts run again on next deploy.
# Use this when the ClickHouse schema has changed and you need a clean state.
clickhouse-reset:
-kubectl delete pvc -n $(NAMESPACE) dealbot-clickhouse
-kubectl delete pod -n $(NAMESPACE) -l app.kubernetes.io/name=dealbot-clickhouse

clickhouse-shell:
kubectl exec -it -n $(NAMESPACE) deployment/dealbot-clickhouse -- clickhouse-client --database dealbot
Comment thread
iand marked this conversation as resolved.

secret: namespace
@if [ ! -f "$(SECRET_ENV_FILE)" ]; then echo "SECRET_ENV_FILE $(SECRET_ENV_FILE) not found"; exit 1; fi
@tmp_env_file="$$(mktemp)"; \
Expand Down
1 change: 1 addition & 0 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"typecheck": "tsc --noEmit --incremental false"
},
"dependencies": {
"@clickhouse/client": "^1.11.0",
"@filoz/synapse-core": "0.3.3",
"@filoz/synapse-sdk": "0.40.2",
"@ipld/car": "^5.4.2",
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { LoggerModule } from "nestjs-pino";
import { AppController } from "./app.controller.js";
import { ClickhouseModule } from "./clickhouse/clickhouse.module.js";
import { buildLoggerModuleParams } from "./common/pino.config.js";
import { configValidationSchema, loadConfig } from "./config/app.config.js";
import { DatabaseModule } from "./database/database.module.js";
Expand All @@ -23,6 +24,7 @@ import { RetrievalModule } from "./retrieval/retrieval.module.js";
}),
DatabaseModule,
MetricsPrometheusModule,
ClickhouseModule,
JobsModule,
DealModule,
RetrievalModule,
Expand Down
35 changes: 35 additions & 0 deletions apps/backend/src/clickhouse/clickhouse.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Global, Module } from "@nestjs/common";
import { makeCounterProvider, makeGaugeProvider, makeHistogramProvider } from "@willsoto/nestjs-prometheus";
import { ClickhouseService } from "./clickhouse.service.js";

@Global()
@Module({
providers: [
makeHistogramProvider({
name: "clickhouseFlushDurationSeconds",
help: "Round-trip time of each ClickHouse flush call in seconds",
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5],
}),
makeCounterProvider({
name: "clickhouseFlushErrorsTotal",
help: "Number of failed ClickHouse flush attempts; non-zero means rows were dropped",
}),
makeCounterProvider({
name: "clickhouseDroppedRowsTotal",
help: "Rows silently dropped due to flush failure or buffer overflow, by reason",
labelNames: ["reason"] as const,
}),
makeGaugeProvider({
name: "clickhouseBufferRows",
help: "Current number of rows queued in the ClickHouse buffer",
}),
makeCounterProvider({
name: "clickhouseRowsInsertedTotal",
help: "Rows successfully written to ClickHouse, by table",
labelNames: ["table"] as const,
}),
ClickhouseService,
],
exports: [ClickhouseService],
})
export class ClickhouseModule {}
82 changes: 82 additions & 0 deletions apps/backend/src/clickhouse/clickhouse.schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS.
* Order matters: database must be created before tables.
*/
export function buildMigrations(database: string): string[] {
return [
Comment thread
iand marked this conversation as resolved.
`CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks
(
timestamp DateTime64(3, 'UTC'), -- when deal entity was saved

probe_location LowCardinality(String), -- dealbot location
sp_address String, -- storage provider address
sp_id Nullable(UInt64), -- storage provider numeric id
sp_name Nullable(String), -- storage provider name

deal_id UUID, -- id assigned by dealbot
piece_cid Nullable(String), -- null if upload failed
piece_id Nullable(UInt64), -- on-chain piece id
file_size_bytes Nullable(UInt64), -- raw file size before CAR encoding
piece_size_bytes Nullable(UInt64), -- piece size after CAR encoding

status LowCardinality(String), -- DealStatus: 'pending' | 'uploaded' | 'piece_added' | 'piece_confirmed' | 'deal_created' | 'failed'
error_code LowCardinality(Nullable(String)),

upload_started_at Nullable(DateTime64(3, 'UTC')), -- when executeUpload() was called
upload_ended_at Nullable(DateTime64(3, 'UTC')), -- when onStored event fired

pieces_added_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesAdded event fired
pieces_confirmed_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesConfirmed event fired

ipni_status LowCardinality(Nullable(String)), -- 'pending' | 'sp_indexed' | 'sp_advertised' | 'verified' | 'failed'
ipni_indexed_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_INDEXED (accuracy limited to poll interval)
ipni_advertised_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_ADVERTISED (accuracy limited to poll interval)
ipni_verified_at Nullable(DateTime64(3, 'UTC')), -- when dealbot confirmed root CID findable via IPNI
ipni_verified_cids_count Nullable(UInt32), -- CIDs confirmed findable via IPNI
ipni_unverified_cids_count Nullable(UInt32) -- CIDs checked but not findable
) ENGINE MergeTree()
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks
(
timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved
probe_location LowCardinality(String), -- dealbot location
sp_address String, -- storage provider address
sp_id Nullable(UInt64), -- storage provider numeric id
sp_name Nullable(String), -- storage provider name

deal_id Nullable(UUID), -- id of deal assigned by dealbot
retrieval_id UUID, -- id of retrieval assigned by dealbot
service_type LowCardinality(String), -- 'direct_sp' | 'ipfs_pin'

status LowCardinality(String), -- RetrievalStatus: 'pending' | 'in_progress' | 'success' | 'failed' | 'timeout'
http_response_code Nullable(UInt16), -- raw HTTP status; null on transport failure

first_byte_ms Nullable(Float64), -- time from request start to first response byte
last_byte_ms Nullable(Float64), -- time from request start to last response byte
bytes_retrieved Nullable(UInt64) -- size of received data in bytes
) ENGINE MergeTree()
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges
(
timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods
probe_location LowCardinality(String), -- dealbot location
sp_address String, -- storage provider address
sp_id Nullable(UInt64), -- storage provider numeric id
sp_name Nullable(String), -- storage provider name

total_periods_due UInt32, -- cumulative proving periods due (confirmed by subgraph)
total_faulted_periods UInt32, -- cumulative periods where proof was not submitted
total_success_periods UInt32, -- cumulative periods where proof was submitted (= due - faulted)
estimated_overdue_periods UInt32 -- estimated periods not yet recorded on-chain but past deadline
) ENGINE MergeTree()
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,
];
}
158 changes: 158 additions & 0 deletions apps/backend/src/clickhouse/clickhouse.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { type ClickHouseClient, createClient } from "@clickhouse/client";
import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { InjectMetric } from "@willsoto/nestjs-prometheus";
import { Counter, Gauge, Histogram } from "prom-client";
import type { IClickhouseConfig, IConfig } from "../config/app.config.js";
import { buildMigrations } from "./clickhouse.schema.js";

interface BufferedRow {
table: string;
row: Record<string, unknown>;
}

@Injectable()
export class ClickhouseService implements OnModuleInit, OnApplicationShutdown {
private readonly logger = new Logger(ClickhouseService.name);
private readonly config: IClickhouseConfig;
private client: ClickHouseClient | null = null;
private buffer: BufferedRow[] = [];
private flushTimer: NodeJS.Timeout | null = null;

constructor(
@InjectMetric("clickhouseFlushDurationSeconds") private readonly flushDuration: Histogram,
@InjectMetric("clickhouseFlushErrorsTotal") private readonly flushErrors: Counter,
@InjectMetric("clickhouseBufferRows") private readonly bufferRows: Gauge,
@InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter,
@InjectMetric("clickhouseDroppedRowsTotal") private readonly droppedRows: Counter,
private readonly configService: ConfigService<IConfig, true>,
) {
this.config = this.configService.get("clickhouse", { infer: true });
}

async onModuleInit() {
if (!this.config.url) {
this.logger.log("CLICKHOUSE_URL not set, writes to ClickHouse disabled");
return;
}

this.client = createClient({
url: this.config.url,
});

const parsedUrl = new URL(this.config.url);
const database = parsedUrl.pathname.replace(/^\//, "");
try {
await this.migrate(database);
} catch (err) {
this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) });
throw err;
}

this.flushTimer = setInterval(() => {
this.flush().catch((err) => {
this.logger.error({ event: "flush_interval_error", error: String(err) });
});
}, this.config.flushIntervalMs);

this.logger.log({
event: "clickhouse_initialized",
host: parsedUrl.host,
database,
batchSize: this.config.batchSize,
flushIntervalMs: this.config.flushIntervalMs,
probeLocation: this.configService.get("app").probeLocation,
});
Comment thread
SgtPooki marked this conversation as resolved.
}

private async migrate(database: string): Promise<void> {
if (!this.client) return;
const migrations = buildMigrations(database);
Comment thread
iand marked this conversation as resolved.
for (const sql of migrations) {
await this.client.command({ query: sql });
}
this.logger.log({ event: "clickhouse_migrated", database });
}

async onApplicationShutdown() {
if (this.flushTimer) {
clearInterval(this.flushTimer);
this.flushTimer = null;
}
await this.flush();
await this.client?.close();
}

/**
* Queue a row for insertion. Returns immediately; the flush happens in the background.
* Safe to call when ClickHouse is disabled: rows are silently dropped.
*/
insert(table: string, row: Record<string, unknown>): void {
if (!this.client) return;

if (this.buffer.length >= this.config.maxBufferSize) {
this.buffer.shift();
this.droppedRows.inc({ reason: "buffer_full" });
}

this.buffer.push({ table, row });
this.bufferRows.set(this.buffer.length);

if (this.buffer.length >= this.config.batchSize) {
this.flush().catch((err) => {
this.logger.error({ event: "flush_batch_error", error: String(err) });
});
}
}

private async flush(): Promise<void> {
Comment thread
iand marked this conversation as resolved.
if (!this.client || this.buffer.length === 0) return;

const n = this.buffer.length;
const batch = this.buffer.slice(0, n);

// Group by table so we can do one insert call per table
const byTable = new Map<string, Record<string, unknown>[]>();
for (const { table, row } of batch) {
let rows = byTable.get(table);
if (!rows) {
rows = [];
byTable.set(table, rows);
}
rows.push(row);
}
Comment thread
SgtPooki marked this conversation as resolved.

const end = this.flushDuration.startTimer();
Comment thread
SgtPooki marked this conversation as resolved.
try {
await Promise.all(
Array.from(byTable.entries()).map(async ([table, rows]) => {
await this.client!.insert({
table,
values: rows,
format: "JSONEachRow",
});
this.rowsInserted.inc({ table }, rows.length);
}),
);
this.buffer.splice(0, n);
this.bufferRows.set(this.buffer.length);
} catch (err) {
this.flushErrors.inc();
this.logger.error({
Comment thread
iand marked this conversation as resolved.
event: "flush_failed",
error: String(err),
pendingRows: n,
});
} finally {
end();
}
}

get probeLocation(): string {
return this.configService.get("app").probeLocation;
}

get enabled(): boolean {
return this.client !== null;
}
}
28 changes: 28 additions & 0 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ export const configValidationSchema = Joi.object({
DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH),
RANDOM_PIECE_SIZES: Joi.string().default("10485760"), // 10 MiB

// ClickHouse
CLICKHOUSE_URL: Joi.string().uri().optional(),
CLICKHOUSE_BATCH_SIZE: Joi.number().integer().min(1).default(500),
CLICKHOUSE_FLUSH_INTERVAL_MS: Joi.number().integer().min(100).default(5000),
CLICKHOUSE_MAX_BUFFER_SIZE: Joi.number().integer().min(1).default(5000),
DEALBOT_PROBE_LOCATION: Joi.string().default("unknown"),

// Timeouts (in milliseconds)
CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead)
Expand All @@ -144,6 +151,7 @@ export interface IAppConfig {
enableDevMode: boolean;
prometheusWalletBalanceTtlSeconds: number;
prometheusWalletBalanceErrorCooldownSeconds: number;
probeLocation: string;
}

export interface IDatabaseConfig {
Expand Down Expand Up @@ -301,6 +309,18 @@ export interface ISpBlocklistConfig {
addresses: Set<string>;
}

export interface IClickhouseConfig {
/**
* ClickHouse connection URL. Must include the database in the path.
* Example: http://default:password@host:8123/dealbot
* If unset, ClickHouse emission is disabled.
*/
url: string | undefined;
batchSize: number;
flushIntervalMs: number;
maxBufferSize: number;
}

export interface IConfig {
app: IAppConfig;
database: IDatabaseConfig;
Expand All @@ -310,6 +330,7 @@ export interface IConfig {
dataset: IDatasetConfig;
timeouts: ITimeoutConfig;
retrieval: IRetrievalConfig;
clickhouse: IClickhouseConfig;
pieceCleanup: IPieceCleanupConfig;
spBlocklists: ISpBlocklistConfig;
}
Expand Down Expand Up @@ -337,6 +358,7 @@ export function loadConfig(): IConfig {
process.env.PROMETHEUS_WALLET_BALANCE_ERROR_COOLDOWN_SECONDS || "60",
10,
),
probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown",
},
database: {
host: process.env.DATABASE_HOST || "localhost",
Expand Down Expand Up @@ -413,6 +435,12 @@ export function loadConfig(): IConfig {
retrieval: {
ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10),
},
clickhouse: {
url: process.env.CLICKHOUSE_URL || undefined,
batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10),
flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10),
maxBufferSize: Number.parseInt(process.env.CLICKHOUSE_MAX_BUFFER_SIZE || "5000", 10),
},
pieceCleanup: {
maxDatasetStorageSizeBytes: Number.parseInt(
process.env.MAX_DATASET_STORAGE_SIZE_BYTES || String(24 * 1024 * 1024 * 1024),
Expand Down
Loading