From b4a218058b19c77148d1f7455a5212279bd20a03 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 8 Apr 2026 10:19:23 +0100 Subject: [PATCH 01/18] fix: add missing backend-resources-local.yaml --- .../overlays/local/backend-resources-local.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 kustomize/overlays/local/backend-resources-local.yaml diff --git a/kustomize/overlays/local/backend-resources-local.yaml b/kustomize/overlays/local/backend-resources-local.yaml new file mode 100644 index 00000000..8ea7a65b --- /dev/null +++ b/kustomize/overlays/local/backend-resources-local.yaml @@ -0,0 +1,16 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dealbot +spec: + template: + spec: + containers: + - name: dealbot + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: 100m + memory: 256Mi From bcf1978f9e908e2bd84dd2f521cd1569193dce07 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:44:46 +0100 Subject: [PATCH 02/18] feat: add clickhouse to local kind cluster for development --- .../local/backend-configmap-local.yaml | 3 + .../overlays/local/clickhouse/deployment.yaml | 56 +++++++++++++++++++ .../local/clickhouse/kustomization.yaml | 7 +++ kustomize/overlays/local/clickhouse/pvc.yaml | 12 ++++ .../overlays/local/clickhouse/service.yaml | 19 +++++++ kustomize/overlays/local/kustomization.yaml | 1 + 6 files changed, 98 insertions(+) create mode 100644 kustomize/overlays/local/clickhouse/deployment.yaml create mode 100644 kustomize/overlays/local/clickhouse/kustomization.yaml create mode 100644 kustomize/overlays/local/clickhouse/pvc.yaml create mode 100644 kustomize/overlays/local/clickhouse/service.yaml diff --git a/kustomize/overlays/local/backend-configmap-local.yaml b/kustomize/overlays/local/backend-configmap-local.yaml index 39f1d7ad..53078962 100644 --- a/kustomize/overlays/local/backend-configmap-local.yaml +++ b/kustomize/overlays/local/backend-configmap-local.yaml @@ -29,3 +29,6 @@ data: RANDOM_PIECE_SIZES: "10485760" PDP_SUBGRAPH_ENDPOINT: "https://api.goldsky.com/api/public/project_cmdfaaxeuz6us01u359yjdctw/subgraphs/pdp-explorer/calibration311a/gn" JOB_SCHEDULER_POLL_SECONDS: "60" + CLICKHOUSE_URL: "http://default:@dealbot-clickhouse:8123/dealbot" + CLICKHOUSE_DATABASE: "dealbot" + DEALBOT_PROBE_LOCATION: "local" diff --git a/kustomize/overlays/local/clickhouse/deployment.yaml b/kustomize/overlays/local/clickhouse/deployment.yaml new file mode 100644 index 00000000..68542880 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/deployment.yaml @@ -0,0 +1,56 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: dealbot-clickhouse + template: + metadata: + labels: + app.kubernetes.io/name: dealbot-clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24-alpine + ports: + - name: http + containerPort: 8123 + protocol: TCP + - name: native + containerPort: 9000 + protocol: TCP + startupProbe: + httpGet: + path: /ping + port: http + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /ping + port: http + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 6 + livenessProbe: + httpGet: + path: /ping + port: http + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 6 + volumeMounts: + - name: clickhouse-storage + mountPath: /var/lib/clickhouse + volumes: + - name: clickhouse-storage + persistentVolumeClaim: + claimName: dealbot-clickhouse diff --git a/kustomize/overlays/local/clickhouse/kustomization.yaml b/kustomize/overlays/local/clickhouse/kustomization.yaml new file mode 100644 index 00000000..733c4d98 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/kustomization.yaml @@ -0,0 +1,7 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: + - deployment.yaml + - service.yaml + - pvc.yaml diff --git a/kustomize/overlays/local/clickhouse/pvc.yaml b/kustomize/overlays/local/clickhouse/pvc.yaml new file mode 100644 index 00000000..b24ce359 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/pvc.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi diff --git a/kustomize/overlays/local/clickhouse/service.yaml b/kustomize/overlays/local/clickhouse/service.yaml new file mode 100644 index 00000000..f697ed8d --- /dev/null +++ b/kustomize/overlays/local/clickhouse/service.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + type: ClusterIP + ports: + - port: 8123 + targetPort: 8123 + protocol: TCP + name: http + - port: 9000 + targetPort: 9000 + protocol: TCP + name: native + selector: + app.kubernetes.io/name: dealbot-clickhouse diff --git a/kustomize/overlays/local/kustomization.yaml b/kustomize/overlays/local/kustomization.yaml index 1a413626..dd3f0e01 100644 --- a/kustomize/overlays/local/kustomization.yaml +++ b/kustomize/overlays/local/kustomization.yaml @@ -7,6 +7,7 @@ resources: - ../../base/backend - ../../base/web - ./postgres + - ./clickhouse - dealbot-worker-deployment.yaml - dealbot-worker-service.yaml From 2beed2e8c7f1984bc34380d1ee91293d9cb99841 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 8 Apr 2026 15:27:59 +0100 Subject: [PATCH 03/18] feat: add initial clickhouse module --- apps/backend/package.json | 1 + apps/backend/src/app.module.ts | 2 + .../src/clickhouse/clickhouse.config.ts | 17 +++ .../src/clickhouse/clickhouse.module.ts | 30 ++++ .../src/clickhouse/clickhouse.service.ts | 130 ++++++++++++++++++ apps/backend/src/config/app.config.ts | 7 + apps/backend/src/worker.module.ts | 2 + pnpm-lock.yaml | 16 +++ 8 files changed, 205 insertions(+) create mode 100644 apps/backend/src/clickhouse/clickhouse.config.ts create mode 100644 apps/backend/src/clickhouse/clickhouse.module.ts create mode 100644 apps/backend/src/clickhouse/clickhouse.service.ts diff --git a/apps/backend/package.json b/apps/backend/package.json index 6283648d..4e1b0d17 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -29,6 +29,7 @@ "typecheck": "tsc --noEmit --incremental false" }, "dependencies": { + "@clickhouse/client": "^1.11.0", "@filoz/synapse-core": "0.3.3", "@filoz/synapse-sdk": "0.40.2", "@ipld/car": "^5.4.2", diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 3fa013a7..3e377703 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -1,6 +1,7 @@ import { Module } from "@nestjs/common"; import { ConfigModule } from "@nestjs/config"; import { AppController } from "./app.controller.js"; +import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { configValidationSchema, loadConfig } from "./config/app.config.js"; import { DatabaseModule } from "./database/database.module.js"; import { DataSourceModule } from "./dataSource/dataSource.module.js"; @@ -21,6 +22,7 @@ import { RetrievalModule } from "./retrieval/retrieval.module.js"; }), DatabaseModule, MetricsPrometheusModule, + ClickhouseModule, JobsModule, DealModule, RetrievalModule, diff --git a/apps/backend/src/clickhouse/clickhouse.config.ts b/apps/backend/src/clickhouse/clickhouse.config.ts new file mode 100644 index 00000000..0401366f --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.config.ts @@ -0,0 +1,17 @@ +export interface IClickhouseConfig { + url: string | undefined; + database: string; + batchSize: number; + flushIntervalMs: number; + probeLocation: string; +} + +export function loadClickhouseConfig(): IClickhouseConfig { + return { + url: process.env.CLICKHOUSE_URL || undefined, + database: process.env.CLICKHOUSE_DATABASE || "dealbot", + batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), + flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), + probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown", + }; +} diff --git a/apps/backend/src/clickhouse/clickhouse.module.ts b/apps/backend/src/clickhouse/clickhouse.module.ts new file mode 100644 index 00000000..e8cb209f --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.module.ts @@ -0,0 +1,30 @@ +import { Global, Module } from "@nestjs/common"; +import { makeCounterProvider, makeGaugeProvider, makeHistogramProvider } from "@willsoto/nestjs-prometheus"; +import { ClickhouseService } from "./clickhouse.service.js"; + +@Global() +@Module({ + providers: [ + makeHistogramProvider({ + name: "clickhouseFlushDurationMs", + help: "Round-trip time of each ClickHouse flush call in milliseconds", + buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000], + }), + makeCounterProvider({ + name: "clickhouseFlushErrorsTotal", + help: "Number of failed ClickHouse flush attempts; non-zero means rows were dropped", + }), + makeGaugeProvider({ + name: "clickhouseBufferRows", + help: "Current number of rows queued in the ClickHouse buffer", + }), + makeCounterProvider({ + name: "clickhouseRowsInsertedTotal", + help: "Rows successfully written to ClickHouse, by table", + labelNames: ["table"] as const, + }), + ClickhouseService, + ], + exports: [ClickhouseService], +}) +export class ClickhouseModule {} diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts new file mode 100644 index 00000000..fc12cf25 --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -0,0 +1,130 @@ +import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs/common"; +import { createClient, type ClickHouseClient } from "@clickhouse/client"; +import { InjectMetric } from "@willsoto/nestjs-prometheus"; +import { Counter, Gauge, Histogram } from "prom-client"; +import { loadClickhouseConfig, type IClickhouseConfig } from "./clickhouse.config.js"; + +interface BufferedRow { + table: string; + row: Record; +} + +@Injectable() +export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { + private readonly logger = new Logger(ClickhouseService.name); + private readonly config: IClickhouseConfig; + private client: ClickHouseClient | null = null; + private buffer: BufferedRow[] = []; + private flushTimer: NodeJS.Timeout | null = null; + + constructor( + @InjectMetric("clickhouseFlushDurationMs") private readonly flushDuration: Histogram, + @InjectMetric("clickhouseFlushErrorsTotal") private readonly flushErrors: Counter, + @InjectMetric("clickhouseBufferRows") private readonly bufferRows: Gauge, + @InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter, + ) { + this.config = loadClickhouseConfig(); + } + + onModuleInit() { + if (!this.config.url) { + this.logger.log("CLICKHOUSE_URL not set, writes to ClickHouse disabled"); + return; + } + + this.client = createClient({ + url: this.config.url, + database: this.config.database, + }); + + this.flushTimer = setInterval(() => { + this.flush().catch((err) => { + this.logger.error({ event: "flush_interval_error", error: String(err) }); + }); + }, this.config.flushIntervalMs); + + this.logger.log({ + event: "clickhouse_initialized", + url: this.config.url, + database: this.config.database, + batchSize: this.config.batchSize, + flushIntervalMs: this.config.flushIntervalMs, + probeLocation: this.config.probeLocation, + }); + } + + async onApplicationShutdown() { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + await this.flush(); + await this.client?.close(); + } + + /** + * Queue a row for insertion. Returns immediately; the flush happens in the background. + * Safe to call when ClickHouse is disable: rows are silently dropped. + */ + insert(table: string, row: Record): void { + if (!this.client) return; + + this.buffer.push({ table, row }); + this.bufferRows.set(this.buffer.length); + + if (this.buffer.length >= this.config.batchSize) { + this.flush().catch((err) => { + this.logger.error({ event: "flush_batch_error", error: String(err) }); + }); + } + } + + private async flush(): Promise { + if (!this.client || this.buffer.length === 0) return; + + const batch = this.buffer.splice(0, this.buffer.length); + this.bufferRows.set(0); + + // Group by table so we can do one insert call per table + const byTable = new Map[]>(); + for (const { table, row } of batch) { + let rows = byTable.get(table); + if (!rows) { + rows = []; + byTable.set(table, rows); + } + rows.push(row); + } + + const end = this.flushDuration.startTimer(); + try { + await Promise.all( + Array.from(byTable.entries()).map(async ([table, rows]) => { + await this.client!.insert({ + table, + values: rows, + format: "JSONEachRow", + }); + this.rowsInserted.inc({ table }, rows.length); + }), + ); + end(); + } catch (err) { + end(); + this.flushErrors.inc(); + this.logger.error({ + event: "flush_failed", + error: String(err), + droppedRows: batch.length, + }); + } + } + + get probeLocation(): string { + return this.config.probeLocation; + } + + get enabled(): boolean { + return this.client !== null; + } +} diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index c084f453..016997a7 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -99,6 +99,13 @@ export const configValidationSchema = Joi.object({ DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH), RANDOM_PIECE_SIZES: Joi.string().default("10485760"), // 10 MiB + // ClickHouse + CLICKHOUSE_URL: Joi.string().uri().optional(), + CLICKHOUSE_DATABASE: Joi.string().default("dealbot"), + CLICKHOUSE_BATCH_SIZE: Joi.number().integer().min(1).default(500), + CLICKHOUSE_FLUSH_INTERVAL_MS: Joi.number().integer().min(100).default(5000), + DEALBOT_PROBE_LOCATION: Joi.string().default("unknown"), + // Timeouts (in milliseconds) CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead) diff --git a/apps/backend/src/worker.module.ts b/apps/backend/src/worker.module.ts index 94bdfc9f..6f8bd74c 100644 --- a/apps/backend/src/worker.module.ts +++ b/apps/backend/src/worker.module.ts @@ -1,5 +1,6 @@ import { Module } from "@nestjs/common"; import { ConfigModule } from "@nestjs/config"; +import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { configValidationSchema, loadConfig } from "./config/app.config.js"; import { DatabaseModule } from "./database/database.module.js"; import { JobsModule } from "./jobs/jobs.module.js"; @@ -14,6 +15,7 @@ import { MetricsPrometheusModule } from "./metrics-prometheus/metrics-prometheus }), DatabaseModule, MetricsPrometheusModule, + ClickhouseModule, JobsModule, ], }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4e708a85..24ca6797 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: apps/backend: dependencies: + '@clickhouse/client': + specifier: ^1.11.0 + version: 1.18.2 '@filoz/synapse-core': specifier: 0.3.3 version: 0.3.3(typescript@5.9.3)(viem@2.47.5(typescript@5.9.3)(zod@4.3.6)) @@ -592,6 +595,13 @@ packages: '@clack/prompts@1.1.0': resolution: {integrity: sha512-pkqbPGtohJAvm4Dphs2M8xE29ggupihHdy1x84HNojZuMtFsHiUlRvqD24tM2+XmI+61LlfNceM3Wr7U5QES5g==} + '@clickhouse/client-common@1.18.2': + resolution: {integrity: sha512-J0SG6q9V31ydxonglpj9xhNRsUxCsF71iEZ784yldqMYwsHixj/9xHFDgBDX3DuMiDx/kPDfXnf+pimp08wIBA==} + + '@clickhouse/client@1.18.2': + resolution: {integrity: sha512-fuquQswRSHWM6D079ZeuGqkMOsqtcUPL06UdTnowmoeeYjVrqisfVmvnw8pc3OeKS4kVb91oygb/MfLDiMs0TQ==} + engines: {node: '>=16'} + '@colors/colors@1.5.0': resolution: {integrity: sha512-ooWCrlZP11i8GImSjTHYHLkvFDP48nS4+204nGb1RiX/WXYHmJA2III9/e2DWVabCESdW7hBAEzHRqUn9OUVvQ==} engines: {node: '>=0.1.90'} @@ -6437,6 +6447,12 @@ snapshots: '@clack/core': 1.1.0 sisteransi: 1.0.5 + '@clickhouse/client-common@1.18.2': {} + + '@clickhouse/client@1.18.2': + dependencies: + '@clickhouse/client-common': 1.18.2 + '@colors/colors@1.5.0': optional: true From 70c6cb2524be7867ab777f9d559857b70e931581 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:01:29 +0100 Subject: [PATCH 04/18] feat: perform clickhouse migration on app startup --- Makefile | 7 ++ .../src/clickhouse/clickhouse.config.ts | 7 +- .../src/clickhouse/clickhouse.schema.ts | 81 +++++++++++++++++++ .../src/clickhouse/clickhouse.service.ts | 19 ++++- apps/backend/src/config/app.config.ts | 1 - .../local/backend-configmap-local.yaml | 1 - .../overlays/local/clickhouse/deployment.yaml | 5 ++ .../local/clickhouse/initdb-configmap.yaml | 7 ++ .../local/clickhouse/kustomization.yaml | 1 + 9 files changed, 121 insertions(+), 8 deletions(-) create mode 100644 apps/backend/src/clickhouse/clickhouse.schema.ts create mode 100644 kustomize/overlays/local/clickhouse/initdb-configmap.yaml diff --git a/Makefile b/Makefile index b004ab8d..efee6c4b 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,7 @@ GRAFANA_VALUES ?= $(MONITORING_OVERLAY)/grafana-values.yaml .PHONY: web-image-build web-kind-load web-logs .PHONY: image-build kind-load deploy undeploy render logs .PHONY: redeploy restart restart-backend restart-web restart-worker +.PHONY: clickhouse-reset .PHONY: monitoring-install monitoring-apply monitoring-up monitoring-down .PHONY: local-up up down @@ -119,6 +120,12 @@ redeploy: $(MAKE) deploy $(MAKE) restart +# Delete the ClickHouse PVC so initdb scripts run again on next deploy. +# Use this when the ClickHouse schema has changed and you need a clean state. +clickhouse-reset: + -kubectl delete pvc -n $(NAMESPACE) dealbot-clickhouse + -kubectl delete pod -n $(NAMESPACE) -l app.kubernetes.io/name=dealbot-clickhouse + secret: namespace @if [ ! -f "$(SECRET_ENV_FILE)" ]; then echo "SECRET_ENV_FILE $(SECRET_ENV_FILE) not found"; exit 1; fi @tmp_env_file="$$(mktemp)"; \ diff --git a/apps/backend/src/clickhouse/clickhouse.config.ts b/apps/backend/src/clickhouse/clickhouse.config.ts index 0401366f..0290493c 100644 --- a/apps/backend/src/clickhouse/clickhouse.config.ts +++ b/apps/backend/src/clickhouse/clickhouse.config.ts @@ -1,6 +1,10 @@ export interface IClickhouseConfig { + /** + * ClickHouse connection URL. Must include the database in the path. + * Example: http://default:password@host:8123/dealbot + * If unset, ClickHouse emission is disabled. + */ url: string | undefined; - database: string; batchSize: number; flushIntervalMs: number; probeLocation: string; @@ -9,7 +13,6 @@ export interface IClickhouseConfig { export function loadClickhouseConfig(): IClickhouseConfig { return { url: process.env.CLICKHOUSE_URL || undefined, - database: process.env.CLICKHOUSE_DATABASE || "dealbot", batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown", diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts new file mode 100644 index 00000000..eeed0173 --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -0,0 +1,81 @@ +/** + * ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS. + * Order matters: database must be created before tables. + */ +export function buildMigrations(database: string): string[] { + return [ + `CREATE TABLE IF NOT EXISTS ${database}.deal_checks +( + timestamp DateTime64(3, 'UTC'), + network LowCardinality(String), + probe_location LowCardinality(String), + sp_address String, + sp_name Nullable(String), + + deal_id UUID, + piece_cid Nullable(String), + piece_id Nullable(UInt64), + file_size_bytes Nullable(UInt64), + piece_size_bytes Nullable(UInt64), + + status LowCardinality(String), + error_code LowCardinality(Nullable(String)), + retry_count UInt8 DEFAULT 0, + + upload_started_at Nullable(DateTime64(3, 'UTC')), + upload_ended_at Nullable(DateTime64(3, 'UTC')), + + pieces_added_at Nullable(DateTime64(3, 'UTC')), + pieces_confirmed_at Nullable(DateTime64(3, 'UTC')), + + ipni_status LowCardinality(Nullable(String)), + ipni_indexed_at Nullable(DateTime64(3, 'UTC')), + ipni_advertised_at Nullable(DateTime64(3, 'UTC')), + ipni_verified_at Nullable(DateTime64(3, 'UTC')), + ipni_verified_cids_count Nullable(UInt32), + ipni_unverified_cids_count Nullable(UInt32) +) ENGINE MergeTree() + PRIMARY KEY (network, probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks +( + timestamp DateTime64(3, 'UTC'), + network LowCardinality(String), + probe_location LowCardinality(String), + sp_address String, + sp_name Nullable(String), + + deal_id Nullable(UUID), + retrieval_id UUID, + service_type LowCardinality(String), + + status LowCardinality(String), + http_response_code Nullable(UInt16), + retry_count UInt8 DEFAULT 0, + + ttfb_ms Nullable(Float64), + last_byte_ms Nullable(Float64), + bytes_retrieved Nullable(UInt64) +) ENGINE MergeTree() + PRIMARY KEY (network, probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges +( + timestamp DateTime64(3, 'UTC'), + network LowCardinality(String), + probe_location LowCardinality(String), + sp_address String, + sp_name Nullable(String), + + total_proving_periods UInt32, + total_faulted_periods UInt32 +) ENGINE MergeTree() + PRIMARY KEY (network, probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + ]; +} diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index fc12cf25..1e732241 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -3,6 +3,7 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; import { loadClickhouseConfig, type IClickhouseConfig } from "./clickhouse.config.js"; +import { buildMigrations } from "./clickhouse.schema.js"; interface BufferedRow { table: string; @@ -26,7 +27,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.config = loadClickhouseConfig(); } - onModuleInit() { + async onModuleInit() { if (!this.config.url) { this.logger.log("CLICKHOUSE_URL not set, writes to ClickHouse disabled"); return; @@ -34,9 +35,11 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.client = createClient({ url: this.config.url, - database: this.config.database, }); + const parsedUrl = new URL(this.config.url!); + await this.migrate(parsedUrl.pathname.replace(/^\//, "")); + this.flushTimer = setInterval(() => { this.flush().catch((err) => { this.logger.error({ event: "flush_interval_error", error: String(err) }); @@ -45,14 +48,22 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.logger.log({ event: "clickhouse_initialized", - url: this.config.url, - database: this.config.database, + host: parsedUrl.host, + database: parsedUrl.pathname.replace(/^\//, ""), batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, probeLocation: this.config.probeLocation, }); } + private async migrate(database: string): Promise { + const migrations = buildMigrations(database); + for (const sql of migrations) { + await this.client!.command({ query: sql }); + } + this.logger.log({ event: "clickhouse_migrated", database }); + } + async onApplicationShutdown() { if (this.flushTimer) { clearInterval(this.flushTimer); diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index 016997a7..d3f0cee1 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -101,7 +101,6 @@ export const configValidationSchema = Joi.object({ // ClickHouse CLICKHOUSE_URL: Joi.string().uri().optional(), - CLICKHOUSE_DATABASE: Joi.string().default("dealbot"), CLICKHOUSE_BATCH_SIZE: Joi.number().integer().min(1).default(500), CLICKHOUSE_FLUSH_INTERVAL_MS: Joi.number().integer().min(100).default(5000), DEALBOT_PROBE_LOCATION: Joi.string().default("unknown"), diff --git a/kustomize/overlays/local/backend-configmap-local.yaml b/kustomize/overlays/local/backend-configmap-local.yaml index 53078962..9aac49ed 100644 --- a/kustomize/overlays/local/backend-configmap-local.yaml +++ b/kustomize/overlays/local/backend-configmap-local.yaml @@ -30,5 +30,4 @@ data: PDP_SUBGRAPH_ENDPOINT: "https://api.goldsky.com/api/public/project_cmdfaaxeuz6us01u359yjdctw/subgraphs/pdp-explorer/calibration311a/gn" JOB_SCHEDULER_POLL_SECONDS: "60" CLICKHOUSE_URL: "http://default:@dealbot-clickhouse:8123/dealbot" - CLICKHOUSE_DATABASE: "dealbot" DEALBOT_PROBE_LOCATION: "local" diff --git a/kustomize/overlays/local/clickhouse/deployment.yaml b/kustomize/overlays/local/clickhouse/deployment.yaml index 68542880..b3a353e3 100644 --- a/kustomize/overlays/local/clickhouse/deployment.yaml +++ b/kustomize/overlays/local/clickhouse/deployment.yaml @@ -50,7 +50,12 @@ spec: volumeMounts: - name: clickhouse-storage mountPath: /var/lib/clickhouse + - name: initdb + mountPath: /docker-entrypoint-initdb.d volumes: - name: clickhouse-storage persistentVolumeClaim: claimName: dealbot-clickhouse + - name: initdb + configMap: + name: dealbot-clickhouse-initdb diff --git a/kustomize/overlays/local/clickhouse/initdb-configmap.yaml b/kustomize/overlays/local/clickhouse/initdb-configmap.yaml new file mode 100644 index 00000000..3bd07310 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/initdb-configmap.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: dealbot-clickhouse-initdb +data: + 00-create-database.sql: | + CREATE DATABASE IF NOT EXISTS dealbot; diff --git a/kustomize/overlays/local/clickhouse/kustomization.yaml b/kustomize/overlays/local/clickhouse/kustomization.yaml index 733c4d98..48ea3f84 100644 --- a/kustomize/overlays/local/clickhouse/kustomization.yaml +++ b/kustomize/overlays/local/clickhouse/kustomization.yaml @@ -5,3 +5,4 @@ resources: - deployment.yaml - service.yaml - pvc.yaml + - initdb-configmap.yaml From 32e649a9c06c6a0d74036ed768e51ad7abefa11d Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:00:22 +0100 Subject: [PATCH 05/18] feat: wire up writes to cliickhouse --- Makefile | 5 ++- .../src/clickhouse/clickhouse.service.ts | 4 +-- .../data-retention.service.spec.ts | 3 ++ .../data-retention/data-retention.service.ts | 12 +++++++ apps/backend/src/deal/deal.service.spec.ts | 3 ++ apps/backend/src/deal/deal.service.ts | 31 ++++++++++++++++++- .../src/retrieval/retrieval.service.spec.ts | 3 ++ .../src/retrieval/retrieval.service.ts | 24 +++++++++++++- 8 files changed, 80 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index efee6c4b..75691605 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ GRAFANA_VALUES ?= $(MONITORING_OVERLAY)/grafana-values.yaml .PHONY: web-image-build web-kind-load web-logs .PHONY: image-build kind-load deploy undeploy render logs .PHONY: redeploy restart restart-backend restart-web restart-worker -.PHONY: clickhouse-reset +.PHONY: clickhouse-reset clickhouse-shell .PHONY: monitoring-install monitoring-apply monitoring-up monitoring-down .PHONY: local-up up down @@ -126,6 +126,9 @@ clickhouse-reset: -kubectl delete pvc -n $(NAMESPACE) dealbot-clickhouse -kubectl delete pod -n $(NAMESPACE) -l app.kubernetes.io/name=dealbot-clickhouse +clickhouse-shell: + kubectl exec -it -n $(NAMESPACE) deployment/dealbot-clickhouse -- clickhouse-client --database dealbot + secret: namespace @if [ ! -f "$(SECRET_ENV_FILE)" ]; then echo "SECRET_ENV_FILE $(SECRET_ENV_FILE) not found"; exit 1; fi @tmp_env_file="$$(mktemp)"; \ diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 1e732241..ff17c59f 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -1,8 +1,8 @@ +import { type ClickHouseClient, createClient } from "@clickhouse/client"; import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs/common"; -import { createClient, type ClickHouseClient } from "@clickhouse/client"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; -import { loadClickhouseConfig, type IClickhouseConfig } from "./clickhouse.config.js"; +import { type IClickhouseConfig, loadClickhouseConfig } from "./clickhouse.config.js"; import { buildMigrations } from "./clickhouse.schema.js"; interface BufferedRow { diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 17151bd1..daf8733d 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -2,6 +2,7 @@ import type { ConfigService } from "@nestjs/config"; import type { Counter, Gauge } from "prom-client"; import { Repository } from "typeorm"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import type { IConfig } from "../config/app.config.js"; import type { DataRetentionBaseline } from "../database/entities/data-retention-baseline.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -121,6 +122,7 @@ describe("DataRetentionService", () => { delete: vi.fn().mockResolvedValue(undefined), }; mockSPRepository = { find: vi.fn() }; + const clickhouseServiceMock = { insert: vi.fn(), probeLocation: "test" } as unknown as ClickhouseService; service = new DataRetentionService( configServiceMock, walletSdkServiceMock as unknown as WalletSdkService, @@ -129,6 +131,7 @@ describe("DataRetentionService", () => { mockSPRepository as unknown as Repository, counterMock as unknown as Counter, gaugeMock as unknown as Gauge, + clickhouseServiceMock, ); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index f4d7ec6d..0f831280 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -4,6 +4,7 @@ import { InjectRepository } from "@nestjs/typeorm"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge } from "prom-client"; import { Raw, Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { toStructuredError } from "../common/logging.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; import { IConfig } from "../config/app.config.js"; @@ -53,6 +54,7 @@ export class DataRetentionService { private readonly dataSetChallengeStatusCounter: Counter, @InjectMetric("pdp_provider_estimated_overdue_periods") private readonly estimatedOverduePeriodsGauge: Gauge, + private readonly clickhouseService: ClickhouseService, ) { this.providerCumulativeTotals = new Map(); } @@ -332,6 +334,16 @@ export class DataRetentionService { const confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods; + this.clickhouseService.insert("data_retention_challenges", { + timestamp: Date.now(), + network: this.configService.get("blockchain").network, + probe_location: this.clickhouseService.probeLocation, + sp_address: address, + sp_name: pdpProvider.name ?? null, + total_proving_periods: Number(totalProvingPeriods), + total_faulted_periods: Number(totalFaultedPeriods), + }); + const normalizedAddress = address.toLowerCase(); const previous = this.providerCumulativeTotals.get(normalizedAddress); diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 25cb5c9b..67ba6f77 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -6,6 +6,7 @@ import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; import { generatePrivateKey } from "viem/accounts"; import { afterEach, beforeEach, describe, expect, it, Mock, vi } from "vitest"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus } from "../database/types.js"; @@ -165,6 +166,7 @@ describe("DealService", () => { { provide: DataStorageCheckMetrics, useValue: mockDataStorageMetrics }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); @@ -843,6 +845,7 @@ describe("DealService", () => { { provide: DataStorageCheckMetrics, useValue: mockDataStorageMetrics }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index c63eb339..3642e91f 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -6,6 +6,7 @@ import { InjectRepository } from "@nestjs/typeorm"; import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; import type { Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { awaitWithAbort } from "../common/abort-utils.js"; import { buildUnixfsCar } from "../common/car-utils.js"; import { createFilecoinPinLogger } from "../common/filecoin-pin-logger.js"; @@ -65,6 +66,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { private readonly dataStorageMetrics: DataStorageCheckMetrics, private readonly retrievalMetrics: RetrievalCheckMetrics, private readonly dataSetCreationMetrics: DataSetCreationCheckMetrics, + private readonly clickhouseService: ClickhouseService, ) { this.blockchainConfig = this.configService.get("blockchain"); } @@ -342,6 +344,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { }); deal.piecesConfirmedTime = new Date(); deal.status = DealStatus.PIECE_CONFIRMED; + deal.pieceId = event.data.pieceIds?.[0] != null ? Number(event.data.pieceIds[0]) : undefined; deal.chainLatencyMs = deal.piecesConfirmedTime.getTime() - deal.piecesAddedTime.getTime(); onchainSucceeded = true; this.dataStorageMetrics.observePieceConfirmedOnChainMs(providerLabels, deal.chainLatencyMs); @@ -736,10 +739,36 @@ export class DealService implements OnModuleInit, OnModuleDestroy { // Only set pieceSize here if it hasn't been set earlier in the deal flow. deal.pieceSize = pieceSize; - deal.pieceId = uploadResult.pieceId; + deal.pieceId ??= uploadResult.pieceId; } private async saveDeal(deal: Deal, dealLogContext: DealLogContext): Promise { + this.clickhouseService.insert("deal_checks", { + timestamp: Date.now(), + network: this.blockchainConfig.network, + probe_location: this.clickhouseService.probeLocation, + sp_address: deal.spAddress, + sp_name: deal.storageProvider?.name ?? null, + deal_id: deal.id, + piece_cid: deal.pieceCid ?? null, + piece_id: deal.pieceId ?? null, + file_size_bytes: deal.fileSize ?? null, + piece_size_bytes: deal.pieceSize ?? null, + status: deal.status, + error_code: deal.errorCode ?? null, + retry_count: deal.retryCount, + upload_started_at: deal.uploadStartTime?.getTime() ?? null, + upload_ended_at: deal.uploadEndTime?.getTime() ?? null, + pieces_added_at: deal.piecesAddedTime?.getTime() ?? null, + pieces_confirmed_at: deal.piecesConfirmedTime?.getTime() ?? null, + ipni_status: deal.ipniStatus ?? null, + ipni_indexed_at: deal.ipniIndexedAt?.getTime() ?? null, + ipni_advertised_at: deal.ipniAdvertisedAt?.getTime() ?? null, + ipni_verified_at: deal.ipniVerifiedAt?.getTime() ?? null, + ipni_verified_cids_count: deal.ipniVerifiedCidsCount ?? null, + ipni_unverified_cids_count: deal.ipniUnverifiedCidsCount ?? null, + }); + try { await this.dealRepository.save(deal); } catch (error) { diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index bf543bdc..a9839098 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -2,6 +2,7 @@ import { ConfigService } from "@nestjs/config"; import { Test, TestingModule } from "@nestjs/testing"; import { getRepositoryToken } from "@nestjs/typeorm"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -32,6 +33,7 @@ describe("RetrievalService timeouts", () => { get: vi.fn((key: string) => { if (key === "app") return { runMode: "api" }; if (key === "jobs") return { pgbossSchedulerEnabled: false }; + if (key === "blockchain") return { network: "calibration" }; if (key === "dataset") return { randomDatasetSizes: [10] }; if (key === "timeouts") return { ipniVerificationTimeoutMs: 10_000, ipniVerificationPollingMs: 2_000 }; return undefined; @@ -93,6 +95,7 @@ describe("RetrievalService timeouts", () => { { provide: DiscoverabilityCheckMetrics, useValue: mockDiscoverabilityMetrics }, { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 520ff73a..65c09b7b 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -3,6 +3,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import { CID } from "multiformats/cid"; import type { Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { type ProviderJobContext, type RetrievalLogContext, toStructuredError } from "../common/logging.js"; import type { Hex } from "../common/types.js"; import type { IConfig } from "../config/app.config.js"; @@ -40,6 +41,7 @@ export class RetrievalService { private readonly discoverabilityMetrics: DiscoverabilityCheckMetrics, private readonly ipniVerificationService: IpniVerificationService, private readonly configService: ConfigService, + private readonly clickhouseService: ClickhouseService, ) {} async performRandomRetrievalForProvider( @@ -255,7 +257,27 @@ export class RetrievalService { this.retrievalMetrics.recordHttpResponseCode(providerLabels, executionResult.metrics.statusCode); } - return this.saveRetrieval(retrieval); + const saved = await this.saveRetrieval(retrieval); + + const network = this.configService.get("blockchain").network; + this.clickhouseService.insert("retrieval_checks", { + timestamp: Date.now(), + network, + probe_location: this.clickhouseService.probeLocation, + sp_address: deal.spAddress, + sp_name: providerLabels.providerName !== "unknown" ? providerLabels.providerName : null, + deal_id: deal.id, + retrieval_id: saved.id, + service_type: saved.serviceType, + status: saved.status, + http_response_code: executionResult.metrics.statusCode || null, + retry_count: saved.retryCount, + ttfb_ms: executionResult.success ? executionResult.metrics.ttfb : null, + last_byte_ms: executionResult.success ? executionResult.metrics.latency : null, + bytes_retrieved: executionResult.success ? executionResult.metrics.responseSize : null, + }); + + return saved; } private recordRetrievalEventMetrics( From 429282caf7fd260453f7085ae638e906430f1156 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:57:03 +0100 Subject: [PATCH 06/18] chore: rename deal_checks table to data_storage_checks --- apps/backend/src/clickhouse/clickhouse.schema.ts | 2 +- apps/backend/src/deal/deal.service.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index eeed0173..63ac27a4 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -4,7 +4,7 @@ */ export function buildMigrations(database: string): string[] { return [ - `CREATE TABLE IF NOT EXISTS ${database}.deal_checks + `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks ( timestamp DateTime64(3, 'UTC'), network LowCardinality(String), diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 3642e91f..e40c86f0 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -743,7 +743,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } private async saveDeal(deal: Deal, dealLogContext: DealLogContext): Promise { - this.clickhouseService.insert("deal_checks", { + this.clickhouseService.insert("data_storage_checks", { timestamp: Date.now(), network: this.blockchainConfig.network, probe_location: this.clickhouseService.probeLocation, From 98230f4f2e6dbb355fe0bde213827e3d6d56b5e3 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 9 Apr 2026 12:40:48 +0100 Subject: [PATCH 07/18] chore: remove network column from clickhouse schemas --- apps/backend/src/clickhouse/clickhouse.schema.ts | 10 ++++------ apps/backend/src/config/app.config.ts | 2 ++ .../src/data-retention/data-retention.service.ts | 1 - apps/backend/src/deal/deal.service.ts | 1 - apps/backend/src/retrieval/retrieval.service.ts | 2 -- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index 63ac27a4..c75d0d14 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -7,7 +7,7 @@ export function buildMigrations(database: string): string[] { `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks ( timestamp DateTime64(3, 'UTC'), - network LowCardinality(String), + probe_location LowCardinality(String), sp_address String, sp_name Nullable(String), @@ -35,14 +35,13 @@ export function buildMigrations(database: string): string[] { ipni_verified_cids_count Nullable(UInt32), ipni_unverified_cids_count Nullable(UInt32) ) ENGINE MergeTree() - PRIMARY KEY (network, probe_location, sp_address, timestamp) + PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks ( timestamp DateTime64(3, 'UTC'), - network LowCardinality(String), probe_location LowCardinality(String), sp_address String, sp_name Nullable(String), @@ -59,14 +58,13 @@ export function buildMigrations(database: string): string[] { last_byte_ms Nullable(Float64), bytes_retrieved Nullable(UInt64) ) ENGINE MergeTree() - PRIMARY KEY (network, probe_location, sp_address, timestamp) + PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges ( timestamp DateTime64(3, 'UTC'), - network LowCardinality(String), probe_location LowCardinality(String), sp_address String, sp_name Nullable(String), @@ -74,7 +72,7 @@ export function buildMigrations(database: string): string[] { total_proving_periods UInt32, total_faulted_periods UInt32 ) ENGINE MergeTree() - PRIMARY KEY (network, probe_location, sp_address, timestamp) + PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, ]; diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index d3f0cee1..dbc29389 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -127,6 +127,7 @@ export interface IAppConfig { enableDevMode: boolean; prometheusWalletBalanceTtlSeconds: number; prometheusWalletBalanceErrorCooldownSeconds: number; + probeLocation: string; } export interface IDatabaseConfig { @@ -306,6 +307,7 @@ export function loadConfig(): IConfig { process.env.PROMETHEUS_WALLET_BALANCE_ERROR_COOLDOWN_SECONDS || "60", 10, ), + probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown", }, database: { host: process.env.DATABASE_HOST || "localhost", diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index 0f831280..708e04d5 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -336,7 +336,6 @@ export class DataRetentionService { this.clickhouseService.insert("data_retention_challenges", { timestamp: Date.now(), - network: this.configService.get("blockchain").network, probe_location: this.clickhouseService.probeLocation, sp_address: address, sp_name: pdpProvider.name ?? null, diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index e40c86f0..91da7c04 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -745,7 +745,6 @@ export class DealService implements OnModuleInit, OnModuleDestroy { private async saveDeal(deal: Deal, dealLogContext: DealLogContext): Promise { this.clickhouseService.insert("data_storage_checks", { timestamp: Date.now(), - network: this.blockchainConfig.network, probe_location: this.clickhouseService.probeLocation, sp_address: deal.spAddress, sp_name: deal.storageProvider?.name ?? null, diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 65c09b7b..1d8e1c26 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -259,10 +259,8 @@ export class RetrievalService { const saved = await this.saveRetrieval(retrieval); - const network = this.configService.get("blockchain").network; this.clickhouseService.insert("retrieval_checks", { timestamp: Date.now(), - network, probe_location: this.clickhouseService.probeLocation, sp_address: deal.spAddress, sp_name: providerLabels.providerName !== "unknown" ? providerLabels.providerName : null, From 2255473de415bb548842d17d339501893fd5cb36 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Thu, 9 Apr 2026 12:48:11 +0100 Subject: [PATCH 08/18] chore: move probeLocation config to app config --- apps/backend/src/clickhouse/clickhouse.config.ts | 2 -- apps/backend/src/clickhouse/clickhouse.service.ts | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.config.ts b/apps/backend/src/clickhouse/clickhouse.config.ts index 0290493c..b3ab5805 100644 --- a/apps/backend/src/clickhouse/clickhouse.config.ts +++ b/apps/backend/src/clickhouse/clickhouse.config.ts @@ -7,7 +7,6 @@ export interface IClickhouseConfig { url: string | undefined; batchSize: number; flushIntervalMs: number; - probeLocation: string; } export function loadClickhouseConfig(): IClickhouseConfig { @@ -15,6 +14,5 @@ export function loadClickhouseConfig(): IClickhouseConfig { url: process.env.CLICKHOUSE_URL || undefined, batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), - probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown", }; } diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index ff17c59f..2a5e3c7d 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -1,7 +1,9 @@ import { type ClickHouseClient, createClient } from "@clickhouse/client"; import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; +import type { IConfig } from "../config/app.config.js"; import { type IClickhouseConfig, loadClickhouseConfig } from "./clickhouse.config.js"; import { buildMigrations } from "./clickhouse.schema.js"; @@ -23,6 +25,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { @InjectMetric("clickhouseFlushErrorsTotal") private readonly flushErrors: Counter, @InjectMetric("clickhouseBufferRows") private readonly bufferRows: Gauge, @InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter, + private readonly configService: ConfigService, ) { this.config = loadClickhouseConfig(); } @@ -52,7 +55,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { database: parsedUrl.pathname.replace(/^\//, ""), batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, - probeLocation: this.config.probeLocation, + probeLocation: this.configService.get("app").probeLocation, }); } @@ -132,7 +135,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { } get probeLocation(): string { - return this.config.probeLocation; + return this.configService.get("app").probeLocation; } get enabled(): boolean { From ccbfcebb3e14bc0682a5d2abb9993a0b1d3a59db Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 10:33:21 +0100 Subject: [PATCH 09/18] chore: rename ttfb_ms to first_byte_ms in clickhouse schema and add schema comments --- .../src/clickhouse/clickhouse.schema.ts | 80 +++++++++---------- .../src/retrieval/retrieval.service.ts | 2 +- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index c75d0d14..a4f49898 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -6,34 +6,34 @@ export function buildMigrations(database: string): string[] { return [ `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks ( - timestamp DateTime64(3, 'UTC'), + timestamp DateTime64(3, 'UTC'), -- when deal entity was saved - probe_location LowCardinality(String), - sp_address String, - sp_name Nullable(String), + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_name Nullable(String), -- storage provider name - deal_id UUID, - piece_cid Nullable(String), - piece_id Nullable(UInt64), - file_size_bytes Nullable(UInt64), - piece_size_bytes Nullable(UInt64), + deal_id UUID, -- id assigned by dealbot + piece_cid Nullable(String), -- null if upload failed + piece_id Nullable(UInt64), -- on-chain piece id + file_size_bytes Nullable(UInt64), -- raw file size before CAR encoding + piece_size_bytes Nullable(UInt64), -- piece size after CAR encoding - status LowCardinality(String), + status LowCardinality(String), -- DealStatus: 'pending' | 'uploaded' | 'piece_added' | 'piece_confirmed' | 'deal_created' | 'failed' error_code LowCardinality(Nullable(String)), - retry_count UInt8 DEFAULT 0, + retry_count UInt8 DEFAULT 0, -- number of attempts made to store - upload_started_at Nullable(DateTime64(3, 'UTC')), - upload_ended_at Nullable(DateTime64(3, 'UTC')), + upload_started_at Nullable(DateTime64(3, 'UTC')), -- when executeUpload() was called + upload_ended_at Nullable(DateTime64(3, 'UTC')), -- when onStored event fired - pieces_added_at Nullable(DateTime64(3, 'UTC')), - pieces_confirmed_at Nullable(DateTime64(3, 'UTC')), + pieces_added_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesAdded event fired + pieces_confirmed_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesConfirmed event fired - ipni_status LowCardinality(Nullable(String)), - ipni_indexed_at Nullable(DateTime64(3, 'UTC')), - ipni_advertised_at Nullable(DateTime64(3, 'UTC')), - ipni_verified_at Nullable(DateTime64(3, 'UTC')), - ipni_verified_cids_count Nullable(UInt32), - ipni_unverified_cids_count Nullable(UInt32) + ipni_status LowCardinality(Nullable(String)), -- 'pending' | 'sp_indexed' | 'sp_advertised' | 'verified' | 'failed' + ipni_indexed_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_INDEXED (accuracy limited to poll interval) + ipni_advertised_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_ADVERTISED (accuracy limited to poll interval) + ipni_verified_at Nullable(DateTime64(3, 'UTC')), -- when dealbot confirmed root CID findable via IPNI + ipni_verified_cids_count Nullable(UInt32), -- CIDs confirmed findable via IPNI + ipni_unverified_cids_count Nullable(UInt32) -- CIDs checked but not findable ) ENGINE MergeTree() PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) @@ -41,22 +41,22 @@ export function buildMigrations(database: string): string[] { `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks ( - timestamp DateTime64(3, 'UTC'), - probe_location LowCardinality(String), - sp_address String, - sp_name Nullable(String), + timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_name Nullable(String), -- storage provider name - deal_id Nullable(UUID), - retrieval_id UUID, - service_type LowCardinality(String), + deal_id Nullable(UUID), -- id of deal assigned by dealbot + retrieval_id UUID, -- id of retrieval assigned by dealbot + service_type LowCardinality(String), -- 'direct_sp' | 'ipfs_pin' - status LowCardinality(String), - http_response_code Nullable(UInt16), - retry_count UInt8 DEFAULT 0, + status LowCardinality(String), -- RetrievalStatus: 'pending' | 'in_progress' | 'success' | 'failed' | 'timeout' + http_response_code Nullable(UInt16), -- raw HTTP status; null on transport failure + retry_count UInt8 DEFAULT 0, -- number of attempts made to retrieve - ttfb_ms Nullable(Float64), - last_byte_ms Nullable(Float64), - bytes_retrieved Nullable(UInt64) + first_byte_ms Nullable(Float64), -- time from request start to first response byte + last_byte_ms Nullable(Float64), -- time from request start to last response byte + bytes_retrieved Nullable(UInt64) -- size of received data in bytes ) ENGINE MergeTree() PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) @@ -64,13 +64,13 @@ export function buildMigrations(database: string): string[] { `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges ( - timestamp DateTime64(3, 'UTC'), - probe_location LowCardinality(String), - sp_address String, - sp_name Nullable(String), + timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_name Nullable(String), -- storage provider name - total_proving_periods UInt32, - total_faulted_periods UInt32 + total_proving_periods UInt32, -- cumulative total proving periods from subgraph at poll time + total_faulted_periods UInt32 -- cumulative total faulted periods from subgraph at poll time ) ENGINE MergeTree() PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 1d8e1c26..d4d1eb53 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -270,7 +270,7 @@ export class RetrievalService { status: saved.status, http_response_code: executionResult.metrics.statusCode || null, retry_count: saved.retryCount, - ttfb_ms: executionResult.success ? executionResult.metrics.ttfb : null, + first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null, last_byte_ms: executionResult.success ? executionResult.metrics.latency : null, bytes_retrieved: executionResult.success ? executionResult.metrics.responseSize : null, }); From 731dfc7b636b6ec07d240d557c9f7b24644bdd2f Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:21:56 +0100 Subject: [PATCH 10/18] feat: add sp_id to clickhouse schema --- apps/backend/src/clickhouse/clickhouse.schema.ts | 3 +++ apps/backend/src/data-retention/data-retention.service.ts | 1 + apps/backend/src/deal/deal.service.ts | 1 + apps/backend/src/retrieval/retrieval.service.ts | 1 + 4 files changed, 6 insertions(+) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index a4f49898..bb5ecc85 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -10,6 +10,7 @@ export function buildMigrations(database: string): string[] { probe_location LowCardinality(String), -- dealbot location sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id sp_name Nullable(String), -- storage provider name deal_id UUID, -- id assigned by dealbot @@ -44,6 +45,7 @@ export function buildMigrations(database: string): string[] { timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved probe_location LowCardinality(String), -- dealbot location sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id sp_name Nullable(String), -- storage provider name deal_id Nullable(UUID), -- id of deal assigned by dealbot @@ -67,6 +69,7 @@ export function buildMigrations(database: string): string[] { timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods probe_location LowCardinality(String), -- dealbot location sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id sp_name Nullable(String), -- storage provider name total_proving_periods UInt32, -- cumulative total proving periods from subgraph at poll time diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index 708e04d5..d25d1b67 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -338,6 +338,7 @@ export class DataRetentionService { timestamp: Date.now(), probe_location: this.clickhouseService.probeLocation, sp_address: address, + sp_id: pdpProvider.id != null ? String(pdpProvider.id) : null, // pdpProvider.id is a BigInt sp_name: pdpProvider.name ?? null, total_proving_periods: Number(totalProvingPeriods), total_faulted_periods: Number(totalFaultedPeriods), diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 91da7c04..ea12d352 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -747,6 +747,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { timestamp: Date.now(), probe_location: this.clickhouseService.probeLocation, sp_address: deal.spAddress, + sp_id: deal.storageProvider?.providerId != null ? String(deal.storageProvider.providerId) : null, // providerId is a BigInt sp_name: deal.storageProvider?.name ?? null, deal_id: deal.id, piece_cid: deal.pieceCid ?? null, diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index d4d1eb53..f98365b9 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -263,6 +263,7 @@ export class RetrievalService { timestamp: Date.now(), probe_location: this.clickhouseService.probeLocation, sp_address: deal.spAddress, + sp_id: providerLabels.providerId !== "unknown" ? providerLabels.providerId : null, sp_name: providerLabels.providerName !== "unknown" ? providerLabels.providerName : null, deal_id: deal.id, retrieval_id: saved.id, From 497ca04959be2856a88312ec09e4f0df997bbb48 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:37:18 +0100 Subject: [PATCH 11/18] chore: stylistic changes for guarding against undefined variables --- apps/backend/src/clickhouse/clickhouse.service.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 2a5e3c7d..7c155c59 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -40,7 +40,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { url: this.config.url, }); - const parsedUrl = new URL(this.config.url!); + const parsedUrl = new URL(this.config.url); await this.migrate(parsedUrl.pathname.replace(/^\//, "")); this.flushTimer = setInterval(() => { @@ -60,9 +60,10 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { } private async migrate(database: string): Promise { + if (!this.client) return; const migrations = buildMigrations(database); for (const sql of migrations) { - await this.client!.command({ query: sql }); + await this.client.command({ query: sql }); } this.logger.log({ event: "clickhouse_migrated", database }); } @@ -78,7 +79,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { /** * Queue a row for insertion. Returns immediately; the flush happens in the background. - * Safe to call when ClickHouse is disable: rows are silently dropped. + * Safe to call when ClickHouse is disabled: rows are silently dropped. */ insert(table: string, row: Record): void { if (!this.client) return; From ff0e6fc5d171a2f506b938737f71faa80b803439 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:52:45 +0100 Subject: [PATCH 12/18] chore: move clickhouse config into app config for consistency --- .../src/clickhouse/clickhouse.config.ts | 18 ------------------ .../src/clickhouse/clickhouse.service.ts | 5 ++--- apps/backend/src/config/app.config.ts | 17 +++++++++++++++++ 3 files changed, 19 insertions(+), 21 deletions(-) delete mode 100644 apps/backend/src/clickhouse/clickhouse.config.ts diff --git a/apps/backend/src/clickhouse/clickhouse.config.ts b/apps/backend/src/clickhouse/clickhouse.config.ts deleted file mode 100644 index b3ab5805..00000000 --- a/apps/backend/src/clickhouse/clickhouse.config.ts +++ /dev/null @@ -1,18 +0,0 @@ -export interface IClickhouseConfig { - /** - * ClickHouse connection URL. Must include the database in the path. - * Example: http://default:password@host:8123/dealbot - * If unset, ClickHouse emission is disabled. - */ - url: string | undefined; - batchSize: number; - flushIntervalMs: number; -} - -export function loadClickhouseConfig(): IClickhouseConfig { - return { - url: process.env.CLICKHOUSE_URL || undefined, - batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), - flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), - }; -} diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index 7c155c59..d5b58e96 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -3,8 +3,7 @@ import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs import { ConfigService } from "@nestjs/config"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge, Histogram } from "prom-client"; -import type { IConfig } from "../config/app.config.js"; -import { type IClickhouseConfig, loadClickhouseConfig } from "./clickhouse.config.js"; +import type { IClickhouseConfig, IConfig } from "../config/app.config.js"; import { buildMigrations } from "./clickhouse.schema.js"; interface BufferedRow { @@ -27,7 +26,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { @InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter, private readonly configService: ConfigService, ) { - this.config = loadClickhouseConfig(); + this.config = this.configService.get("clickhouse", { infer: true }); } async onModuleInit() { diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index dbc29389..0323bfd5 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -272,6 +272,17 @@ export interface ISpBlocklistConfig { addresses: Set; } +export interface IClickhouseConfig { + /** + * ClickHouse connection URL. Must include the database in the path. + * Example: http://default:password@host:8123/dealbot + * If unset, ClickHouse emission is disabled. + */ + url: string | undefined; + batchSize: number; + flushIntervalMs: number; +} + export interface IConfig { app: IAppConfig; database: IDatabaseConfig; @@ -281,6 +292,7 @@ export interface IConfig { dataset: IDatasetConfig; timeouts: ITimeoutConfig; retrieval: IRetrievalConfig; + clickhouse: IClickhouseConfig; spBlocklists: ISpBlocklistConfig; } @@ -383,6 +395,11 @@ export function loadConfig(): IConfig { retrieval: { ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10), }, + clickhouse: { + url: process.env.CLICKHOUSE_URL || undefined, + batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), + flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), + }, spBlocklists: { ids: parseIdList(process.env.BLOCKED_SP_IDS), addresses: parseAddressList(process.env.BLOCKED_SP_ADDRESSES), From 4871d7fa899834f5cb0e90ce4fbf6beed1578680 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:08:06 +0100 Subject: [PATCH 13/18] chore: log errors with clickhouse migration in module startup --- apps/backend/src/clickhouse/clickhouse.service.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index d5b58e96..e12d91de 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -40,7 +40,13 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { }); const parsedUrl = new URL(this.config.url); - await this.migrate(parsedUrl.pathname.replace(/^\//, "")); + const database = parsedUrl.pathname.replace(/^\//, ""); + try { + await this.migrate(database); + } catch (err) { + this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) }); + throw err; + } this.flushTimer = setInterval(() => { this.flush().catch((err) => { @@ -51,7 +57,7 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.logger.log({ event: "clickhouse_initialized", host: parsedUrl.host, - database: parsedUrl.pathname.replace(/^\//, ""), + database, batchSize: this.config.batchSize, flushIntervalMs: this.config.flushIntervalMs, probeLocation: this.configService.get("app").probeLocation, From 0534a179691c190cbfed2cd1c89d45e02b8aa58c Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:49:32 +0100 Subject: [PATCH 14/18] chore: expand documentation to include clickhouse --- docs/architecture.md | 6 +++ docs/checks/events-and-metrics.md | 16 +++++++ docs/environment-variables.md | 73 +++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/docs/architecture.md b/docs/architecture.md index 55bea880..0855a6f9 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -37,6 +37,12 @@ Postgres is the system-of-record for Dealbot state: - Retrieval lifecycle records in `retrievals`. - Scheduler state in `job_schedule_state` and queue execution state in `pgboss.job`. +ClickHouse is for long-term check result storage and analysis, it's optional: + +- Each completed check writes one row to the relevant table: `data_storage_checks`, `retrieval_checks`, or `data_retention_challenges`. +- Rows are buffered in memory and flushed in batches; failed flushes are logged and the batch is dropped (ClickHouse is a metrics sink, not a source of truth). +- ClickHouse is not a dependency for normal operation. The service starts and runs without it. + Prometheus is for runtime observability, not durable state: - Job health/performance metrics (for example `jobs_started_total`, `jobs_completed_total`, `jobs_queued`) are emitted at runtime on `/metrics`. diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index 8ededa34..1fc0f2fe 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -106,3 +106,19 @@ sequenceDiagram | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Not tied to an [event above](#event-list) but rather to the periodic chain-checking done in the [Data Retention Check](./data-retention.md) | `success`, `failure` | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pdp_provider_overdue_periods` | Data Retention | Emitted on every poll | Gauge value (estimated overdue periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | + +## ClickHouse Tables + +When `CLICKHOUSE_URL` is configured, dealbot writes one row per check result to ClickHouse for long-term storage and analysis. All tables are partitioned by month with a 1-year TTL. + +> **Source of truth**: the DDL and column-level comments in [`clickhouse.schema.ts`](../../apps/backend/src/clickhouse/clickhouse.schema.ts) are authoritative. The summary below is for orientation only. + +- **`data_storage_checks`** — one row written each time a deal is saved (on every status transition). Populated by [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts). +- **`retrieval_checks`** — one row per retrieval attempt. Populated by [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts). +- **`data_retention_challenges`** — one row per provider per poll cycle. Populated by [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts). + +All tables share the primary key `(probe_location, sp_address, timestamp)`: + +- `probe_location` - identifies which dealbot instance produced the row, allowing multiple deployments to be distinguished in queries (set via `DEALBOT_PROBE_LOCATION`) +- `sp_address` - the Ethereum/FEVM address of the storage provider under test +- `timestamp` - when the row was written (milliseconds UTC) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 302850de..6360b4b7 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -13,6 +13,7 @@ This document provides a comprehensive guide to all environment variables used b | [Scheduling](#scheduling-configuration) | `PROVIDERS_REFRESH_INTERVAL_SECONDS`, `DATA_RETENTION_POLL_INTERVAL_SECONDS`, `DEALBOT_MAINTENANCE_WINDOWS_UTC`, `DEALBOT_MAINTENANCE_WINDOW_MINUTES` | | [Jobs (pg-boss)](#jobs-pg-boss) | `DEALBOT_PGBOSS_SCHEDULER_ENABLED`, `DEALBOT_PGBOSS_POOL_MAX`, `DEALS_PER_SP_PER_HOUR`, `DATASET_CREATIONS_PER_SP_PER_HOUR`, `RETRIEVALS_PER_SP_PER_HOUR`, `METRICS_PER_HOUR`, `JOB_SCHEDULER_POLL_SECONDS`, `JOB_WORKER_POLL_SECONDS`, `PG_BOSS_LOCAL_CONCURRENCY`, `JOB_CATCHUP_MAX_ENQUEUE`, `JOB_SCHEDULE_PHASE_SECONDS`, `JOB_ENQUEUE_JITTER_SECONDS`, `DEAL_JOB_TIMEOUT_SECONDS`, `RETRIEVAL_JOB_TIMEOUT_SECONDS`, `IPFS_BLOCK_FETCH_CONCURRENCY` | | [Dataset](#dataset-configuration) | `DEALBOT_LOCAL_DATASETS_PATH`, `RANDOM_PIECE_SIZES` | +| [ClickHouse](#clickhouse-configuration) | `CLICKHOUSE_URL`, `CLICKHOUSE_BATCH_SIZE`, `CLICKHOUSE_FLUSH_INTERVAL_MS`, `DEALBOT_PROBE_LOCATION` | | [Timeouts](#timeout-configuration) | `CONNECT_TIMEOUT_MS`, `HTTP_REQUEST_TIMEOUT_MS`, `HTTP2_REQUEST_TIMEOUT_MS`, `IPNI_VERIFICATION_TIMEOUT_MS`, `IPNI_VERIFICATION_POLLING_MS` | | [SP Blocklist](#sp-blocklist-configuration) | `BLOCKED_SP_IDS`, `BLOCKED_SP_ADDRESSES` | | [Prometheus Metrics](#prometheus-metrics-configuration) | `PROMETHEUS_WALLET_BALANCE_TTL_SECONDS`, `PROMETHEUS_WALLET_BALANCE_ERROR_COOLDOWN_SECONDS` | @@ -867,6 +868,78 @@ RANDOM_PIECE_SIZES=1024,10240,102400 --- +## ClickHouse Configuration + +Dealbot optionally writes check results to ClickHouse for long-term storage and analysis. All ClickHouse writes are disabled when `CLICKHOUSE_URL` is unset. + +### `CLICKHOUSE_URL` + +- **Type**: `string` (HTTP/HTTPS URL) +- **Required**: No +- **Default**: Not set (ClickHouse writes disabled) + +**Role**: ClickHouse connection URL. Must include the database name in the path. When unset, all ClickHouse inserts are silently dropped and no connection is made. + +**Example**: + +```bash +CLICKHOUSE_URL=http://default:password@clickhouse-host:8123/dealbot +``` + +--- + +### `CLICKHOUSE_BATCH_SIZE` + +- **Type**: `number` +- **Required**: No +- **Default**: `500` +- **Minimum**: `1` + +**Role**: Maximum number of rows to accumulate in the in-memory buffer before triggering a flush to ClickHouse. Rows are also flushed on the interval defined by `CLICKHOUSE_FLUSH_INTERVAL_MS`. + +**When to update**: + +- Decrease for lower-throughput deployments where you want more frequent writes +- Increase to reduce write frequency under high load + +--- + +### `CLICKHOUSE_FLUSH_INTERVAL_MS` + +- **Type**: `number` (milliseconds) +- **Required**: No +- **Default**: `5000` (5 seconds) +- **Minimum**: `100` + +**Role**: How often the ClickHouse buffer is flushed, regardless of batch size. + +**When to update**: + +- Decrease for more real-time data visibility +- Increase to reduce write pressure on the ClickHouse server + +--- + +### `DEALBOT_PROBE_LOCATION` + +- **Type**: `string` +- **Required**: No +- **Default**: `unknown` + +**Role**: A label identifying where this dealbot instance is running (e.g. `aws-us-east-1`, `local`). Written to ClickHouse as `probe_location` on every row, allowing multi-region deployments to be distinguished in queries. + +**When to update**: + +- Set to a meaningful geographic or deployment identifier for each dealbot instance + +**Example**: + +```bash +DEALBOT_PROBE_LOCATION=aws-us-east-1 +``` + +--- + ## Timeout Configuration ### `CONNECT_TIMEOUT_MS` From addbd2ecd38ae7c6d969b4820b662220f24c1c75 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Fri, 24 Apr 2026 09:47:31 +0100 Subject: [PATCH 15/18] chore: address review comments --- .../src/clickhouse/clickhouse.module.ts | 11 +++++++--- .../src/clickhouse/clickhouse.service.ts | 20 +++++++++++++------ apps/backend/src/config/app.config.ts | 3 +++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.module.ts b/apps/backend/src/clickhouse/clickhouse.module.ts index e8cb209f..a992ebb0 100644 --- a/apps/backend/src/clickhouse/clickhouse.module.ts +++ b/apps/backend/src/clickhouse/clickhouse.module.ts @@ -6,14 +6,19 @@ import { ClickhouseService } from "./clickhouse.service.js"; @Module({ providers: [ makeHistogramProvider({ - name: "clickhouseFlushDurationMs", - help: "Round-trip time of each ClickHouse flush call in milliseconds", - buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000], + name: "clickhouseFlushDurationSeconds", + help: "Round-trip time of each ClickHouse flush call in seconds", + buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5], }), makeCounterProvider({ name: "clickhouseFlushErrorsTotal", help: "Number of failed ClickHouse flush attempts; non-zero means rows were dropped", }), + makeCounterProvider({ + name: "clickhouseDroppedRowsTotal", + help: "Rows silently dropped due to flush failure or buffer overflow, by reason", + labelNames: ["reason"] as const, + }), makeGaugeProvider({ name: "clickhouseBufferRows", help: "Current number of rows queued in the ClickHouse buffer", diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts index e12d91de..2cdfc34a 100644 --- a/apps/backend/src/clickhouse/clickhouse.service.ts +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -20,10 +20,11 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private flushTimer: NodeJS.Timeout | null = null; constructor( - @InjectMetric("clickhouseFlushDurationMs") private readonly flushDuration: Histogram, + @InjectMetric("clickhouseFlushDurationSeconds") private readonly flushDuration: Histogram, @InjectMetric("clickhouseFlushErrorsTotal") private readonly flushErrors: Counter, @InjectMetric("clickhouseBufferRows") private readonly bufferRows: Gauge, @InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter, + @InjectMetric("clickhouseDroppedRowsTotal") private readonly droppedRows: Counter, private readonly configService: ConfigService, ) { this.config = this.configService.get("clickhouse", { infer: true }); @@ -89,6 +90,11 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { insert(table: string, row: Record): void { if (!this.client) return; + if (this.buffer.length >= this.config.maxBufferSize) { + this.buffer.shift(); + this.droppedRows.inc({ reason: "buffer_full" }); + } + this.buffer.push({ table, row }); this.bufferRows.set(this.buffer.length); @@ -102,8 +108,8 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { private async flush(): Promise { if (!this.client || this.buffer.length === 0) return; - const batch = this.buffer.splice(0, this.buffer.length); - this.bufferRows.set(0); + const n = this.buffer.length; + const batch = this.buffer.slice(0, n); // Group by table so we can do one insert call per table const byTable = new Map[]>(); @@ -128,15 +134,17 @@ export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { this.rowsInserted.inc({ table }, rows.length); }), ); - end(); + this.buffer.splice(0, n); + this.bufferRows.set(this.buffer.length); } catch (err) { - end(); this.flushErrors.inc(); this.logger.error({ event: "flush_failed", error: String(err), - droppedRows: batch.length, + pendingRows: n, }); + } finally { + end(); } } diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index 0323bfd5..7888ded8 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -103,6 +103,7 @@ export const configValidationSchema = Joi.object({ CLICKHOUSE_URL: Joi.string().uri().optional(), CLICKHOUSE_BATCH_SIZE: Joi.number().integer().min(1).default(500), CLICKHOUSE_FLUSH_INTERVAL_MS: Joi.number().integer().min(100).default(5000), + CLICKHOUSE_MAX_BUFFER_SIZE: Joi.number().integer().min(1).default(5000), DEALBOT_PROBE_LOCATION: Joi.string().default("unknown"), // Timeouts (in milliseconds) @@ -281,6 +282,7 @@ export interface IClickhouseConfig { url: string | undefined; batchSize: number; flushIntervalMs: number; + maxBufferSize: number; } export interface IConfig { @@ -399,6 +401,7 @@ export function loadConfig(): IConfig { url: process.env.CLICKHOUSE_URL || undefined, batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), + maxBufferSize: Number.parseInt(process.env.CLICKHOUSE_MAX_BUFFER_SIZE || "5000", 10), }, spBlocklists: { ids: parseIdList(process.env.BLOCKED_SP_IDS), From 00cd97812e57a617fd5b9925f8cd73d5ee213cdd Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Fri, 24 Apr 2026 09:57:15 +0100 Subject: [PATCH 16/18] chore: fix import order in worker module and add clickhouse mock --- apps/backend/src/retrieval/retrieval.service.spec.ts | 1 + apps/backend/src/worker.module.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index f7254281..232ae4e5 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -364,6 +364,7 @@ describe("RetrievalService DB/provider drift", () => { { provide: DiscoverabilityCheckMetrics, useValue: {} }, { provide: IpniVerificationService, useValue: {} }, { provide: ConfigService, useValue: mockConfigService }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); return module.get(RetrievalService); diff --git a/apps/backend/src/worker.module.ts b/apps/backend/src/worker.module.ts index 24a727a1..67bbd5da 100644 --- a/apps/backend/src/worker.module.ts +++ b/apps/backend/src/worker.module.ts @@ -1,7 +1,7 @@ import { Module } from "@nestjs/common"; import { ConfigModule } from "@nestjs/config"; -import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { LoggerModule } from "nestjs-pino"; +import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { buildLoggerModuleParams } from "./common/pino.config.js"; import { configValidationSchema, loadConfig } from "./config/app.config.js"; import { DatabaseModule } from "./database/database.module.js"; From 240c799bc7b35e107dcb172917c16a02f5366b02 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:29:14 +0100 Subject: [PATCH 17/18] chore: drop retry_count from ClickHouse tables --- apps/backend/src/clickhouse/clickhouse.schema.ts | 2 -- apps/backend/src/deal/deal.service.ts | 1 - apps/backend/src/retrieval/retrieval.service.ts | 1 - 3 files changed, 4 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index bb5ecc85..f75beac8 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -21,7 +21,6 @@ export function buildMigrations(database: string): string[] { status LowCardinality(String), -- DealStatus: 'pending' | 'uploaded' | 'piece_added' | 'piece_confirmed' | 'deal_created' | 'failed' error_code LowCardinality(Nullable(String)), - retry_count UInt8 DEFAULT 0, -- number of attempts made to store upload_started_at Nullable(DateTime64(3, 'UTC')), -- when executeUpload() was called upload_ended_at Nullable(DateTime64(3, 'UTC')), -- when onStored event fired @@ -54,7 +53,6 @@ export function buildMigrations(database: string): string[] { status LowCardinality(String), -- RetrievalStatus: 'pending' | 'in_progress' | 'success' | 'failed' | 'timeout' http_response_code Nullable(UInt16), -- raw HTTP status; null on transport failure - retry_count UInt8 DEFAULT 0, -- number of attempts made to retrieve first_byte_ms Nullable(Float64), -- time from request start to first response byte last_byte_ms Nullable(Float64), -- time from request start to last response byte diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index dcc7c877..c5906797 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -814,7 +814,6 @@ export class DealService implements OnModuleInit, OnModuleDestroy { piece_size_bytes: deal.pieceSize ?? null, status: deal.status, error_code: deal.errorCode ?? null, - retry_count: deal.retryCount, upload_started_at: deal.uploadStartTime?.getTime() ?? null, upload_ended_at: deal.uploadEndTime?.getTime() ?? null, pieces_added_at: deal.piecesAddedTime?.getTime() ?? null, diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 8e77bf83..aff8849d 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -270,7 +270,6 @@ export class RetrievalService { service_type: saved.serviceType, status: saved.status, http_response_code: executionResult.metrics.statusCode || null, - retry_count: saved.retryCount, first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null, last_byte_ms: executionResult.success ? executionResult.metrics.latency : null, bytes_retrieved: executionResult.success ? executionResult.metrics.responseSize : null, From 19f9658f48def16af95374ddfb179d38e0a535d9 Mon Sep 17 00:00:00 2001 From: Ian Davis <18375+iand@users.noreply.github.com> Date: Fri, 24 Apr 2026 15:59:47 +0100 Subject: [PATCH 18/18] feat: expand data_retention_challenges schema --- apps/backend/src/clickhouse/clickhouse.schema.ts | 6 ++++-- apps/backend/src/data-retention/data-retention.service.ts | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index f75beac8..897d8c92 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -70,8 +70,10 @@ export function buildMigrations(database: string): string[] { sp_id Nullable(UInt64), -- storage provider numeric id sp_name Nullable(String), -- storage provider name - total_proving_periods UInt32, -- cumulative total proving periods from subgraph at poll time - total_faulted_periods UInt32 -- cumulative total faulted periods from subgraph at poll time + total_periods_due UInt32, -- cumulative proving periods due (confirmed by subgraph) + total_faulted_periods UInt32, -- cumulative periods where proof was not submitted + total_success_periods UInt32, -- cumulative periods where proof was submitted (= due - faulted) + estimated_overdue_periods UInt32 -- estimated periods not yet recorded on-chain but past deadline ) ENGINE MergeTree() PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index d25d1b67..2922ab99 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -340,8 +340,10 @@ export class DataRetentionService { sp_address: address, sp_id: pdpProvider.id != null ? String(pdpProvider.id) : null, // pdpProvider.id is a BigInt sp_name: pdpProvider.name ?? null, - total_proving_periods: Number(totalProvingPeriods), + total_periods_due: Number(totalProvingPeriods), total_faulted_periods: Number(totalFaultedPeriods), + total_success_periods: Number(confirmedTotalSuccess), + estimated_overdue_periods: Number(estimatedOverduePeriods), }); const normalizedAddress = address.toLowerCase();