diff --git a/Makefile b/Makefile index b004ab8d..75691605 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,7 @@ GRAFANA_VALUES ?= $(MONITORING_OVERLAY)/grafana-values.yaml .PHONY: web-image-build web-kind-load web-logs .PHONY: image-build kind-load deploy undeploy render logs .PHONY: redeploy restart restart-backend restart-web restart-worker +.PHONY: clickhouse-reset clickhouse-shell .PHONY: monitoring-install monitoring-apply monitoring-up monitoring-down .PHONY: local-up up down @@ -119,6 +120,15 @@ redeploy: $(MAKE) deploy $(MAKE) restart +# Delete the ClickHouse PVC so initdb scripts run again on next deploy. +# Use this when the ClickHouse schema has changed and you need a clean state. +clickhouse-reset: + -kubectl delete pvc -n $(NAMESPACE) dealbot-clickhouse + -kubectl delete pod -n $(NAMESPACE) -l app.kubernetes.io/name=dealbot-clickhouse + +clickhouse-shell: + kubectl exec -it -n $(NAMESPACE) deployment/dealbot-clickhouse -- clickhouse-client --database dealbot + secret: namespace @if [ ! -f "$(SECRET_ENV_FILE)" ]; then echo "SECRET_ENV_FILE $(SECRET_ENV_FILE) not found"; exit 1; fi @tmp_env_file="$$(mktemp)"; \ diff --git a/apps/backend/package.json b/apps/backend/package.json index 77b7fb4a..ec4ea66a 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -29,6 +29,7 @@ "typecheck": "tsc --noEmit --incremental false" }, "dependencies": { + "@clickhouse/client": "^1.11.0", "@filoz/synapse-core": "0.3.3", "@filoz/synapse-sdk": "0.40.2", "@ipld/car": "^5.4.2", diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 29751324..569ec5e4 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -2,6 +2,7 @@ import { Module } from "@nestjs/common"; import { ConfigModule } from "@nestjs/config"; import { LoggerModule } from "nestjs-pino"; import { AppController } from "./app.controller.js"; +import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { buildLoggerModuleParams } from "./common/pino.config.js"; import { configValidationSchema, loadConfig } from "./config/app.config.js"; import { DatabaseModule } from "./database/database.module.js"; @@ -23,6 +24,7 @@ import { RetrievalModule } from "./retrieval/retrieval.module.js"; }), DatabaseModule, MetricsPrometheusModule, + ClickhouseModule, JobsModule, DealModule, RetrievalModule, diff --git a/apps/backend/src/clickhouse/clickhouse.module.ts b/apps/backend/src/clickhouse/clickhouse.module.ts new file mode 100644 index 00000000..a992ebb0 --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.module.ts @@ -0,0 +1,35 @@ +import { Global, Module } from "@nestjs/common"; +import { makeCounterProvider, makeGaugeProvider, makeHistogramProvider } from "@willsoto/nestjs-prometheus"; +import { ClickhouseService } from "./clickhouse.service.js"; + +@Global() +@Module({ + providers: [ + makeHistogramProvider({ + name: "clickhouseFlushDurationSeconds", + help: "Round-trip time of each ClickHouse flush call in seconds", + buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5], + }), + makeCounterProvider({ + name: "clickhouseFlushErrorsTotal", + help: "Number of failed ClickHouse flush attempts; non-zero means rows were dropped", + }), + makeCounterProvider({ + name: "clickhouseDroppedRowsTotal", + help: "Rows silently dropped due to flush failure or buffer overflow, by reason", + labelNames: ["reason"] as const, + }), + makeGaugeProvider({ + name: "clickhouseBufferRows", + help: "Current number of rows queued in the ClickHouse buffer", + }), + makeCounterProvider({ + name: "clickhouseRowsInsertedTotal", + help: "Rows successfully written to ClickHouse, by table", + labelNames: ["table"] as const, + }), + ClickhouseService, + ], + exports: [ClickhouseService], +}) +export class ClickhouseModule {} diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts new file mode 100644 index 00000000..897d8c92 --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -0,0 +1,82 @@ +/** + * ClickHouse DDL statements executed on startup via CREATE DATABASE/TABLE IF NOT EXISTS. + * Order matters: database must be created before tables. + */ +export function buildMigrations(database: string): string[] { + return [ + `CREATE TABLE IF NOT EXISTS ${database}.data_storage_checks +( + timestamp DateTime64(3, 'UTC'), -- when deal entity was saved + + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id + sp_name Nullable(String), -- storage provider name + + deal_id UUID, -- id assigned by dealbot + piece_cid Nullable(String), -- null if upload failed + piece_id Nullable(UInt64), -- on-chain piece id + file_size_bytes Nullable(UInt64), -- raw file size before CAR encoding + piece_size_bytes Nullable(UInt64), -- piece size after CAR encoding + + status LowCardinality(String), -- DealStatus: 'pending' | 'uploaded' | 'piece_added' | 'piece_confirmed' | 'deal_created' | 'failed' + error_code LowCardinality(Nullable(String)), + + upload_started_at Nullable(DateTime64(3, 'UTC')), -- when executeUpload() was called + upload_ended_at Nullable(DateTime64(3, 'UTC')), -- when onStored event fired + + pieces_added_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesAdded event fired + pieces_confirmed_at Nullable(DateTime64(3, 'UTC')), -- when onPiecesConfirmed event fired + + ipni_status LowCardinality(Nullable(String)), -- 'pending' | 'sp_indexed' | 'sp_advertised' | 'verified' | 'failed' + ipni_indexed_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_INDEXED (accuracy limited to poll interval) + ipni_advertised_at Nullable(DateTime64(3, 'UTC')), -- when dealbot first observed SP_ADVERTISED (accuracy limited to poll interval) + ipni_verified_at Nullable(DateTime64(3, 'UTC')), -- when dealbot confirmed root CID findable via IPNI + ipni_verified_cids_count Nullable(UInt32), -- CIDs confirmed findable via IPNI + ipni_unverified_cids_count Nullable(UInt32) -- CIDs checked but not findable +) ENGINE MergeTree() + PRIMARY KEY (probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + `CREATE TABLE IF NOT EXISTS ${database}.retrieval_checks +( + timestamp DateTime64(3, 'UTC'), -- when retrieval entity was saved + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id + sp_name Nullable(String), -- storage provider name + + deal_id Nullable(UUID), -- id of deal assigned by dealbot + retrieval_id UUID, -- id of retrieval assigned by dealbot + service_type LowCardinality(String), -- 'direct_sp' | 'ipfs_pin' + + status LowCardinality(String), -- RetrievalStatus: 'pending' | 'in_progress' | 'success' | 'failed' | 'timeout' + http_response_code Nullable(UInt16), -- raw HTTP status; null on transport failure + + first_byte_ms Nullable(Float64), -- time from request start to first response byte + last_byte_ms Nullable(Float64), -- time from request start to last response byte + bytes_retrieved Nullable(UInt64) -- size of received data in bytes +) ENGINE MergeTree() + PRIMARY KEY (probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + `CREATE TABLE IF NOT EXISTS ${database}.data_retention_challenges +( + timestamp DateTime64(3, 'UTC'), -- when the poll ran and detected these periods + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id + sp_name Nullable(String), -- storage provider name + + total_periods_due UInt32, -- cumulative proving periods due (confirmed by subgraph) + total_faulted_periods UInt32, -- cumulative periods where proof was not submitted + total_success_periods UInt32, -- cumulative periods where proof was submitted (= due - faulted) + estimated_overdue_periods UInt32 -- estimated periods not yet recorded on-chain but past deadline +) ENGINE MergeTree() + PRIMARY KEY (probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + ]; +} diff --git a/apps/backend/src/clickhouse/clickhouse.service.ts b/apps/backend/src/clickhouse/clickhouse.service.ts new file mode 100644 index 00000000..2cdfc34a --- /dev/null +++ b/apps/backend/src/clickhouse/clickhouse.service.ts @@ -0,0 +1,158 @@ +import { type ClickHouseClient, createClient } from "@clickhouse/client"; +import { Injectable, Logger, OnApplicationShutdown, OnModuleInit } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { InjectMetric } from "@willsoto/nestjs-prometheus"; +import { Counter, Gauge, Histogram } from "prom-client"; +import type { IClickhouseConfig, IConfig } from "../config/app.config.js"; +import { buildMigrations } from "./clickhouse.schema.js"; + +interface BufferedRow { + table: string; + row: Record; +} + +@Injectable() +export class ClickhouseService implements OnModuleInit, OnApplicationShutdown { + private readonly logger = new Logger(ClickhouseService.name); + private readonly config: IClickhouseConfig; + private client: ClickHouseClient | null = null; + private buffer: BufferedRow[] = []; + private flushTimer: NodeJS.Timeout | null = null; + + constructor( + @InjectMetric("clickhouseFlushDurationSeconds") private readonly flushDuration: Histogram, + @InjectMetric("clickhouseFlushErrorsTotal") private readonly flushErrors: Counter, + @InjectMetric("clickhouseBufferRows") private readonly bufferRows: Gauge, + @InjectMetric("clickhouseRowsInsertedTotal") private readonly rowsInserted: Counter, + @InjectMetric("clickhouseDroppedRowsTotal") private readonly droppedRows: Counter, + private readonly configService: ConfigService, + ) { + this.config = this.configService.get("clickhouse", { infer: true }); + } + + async onModuleInit() { + if (!this.config.url) { + this.logger.log("CLICKHOUSE_URL not set, writes to ClickHouse disabled"); + return; + } + + this.client = createClient({ + url: this.config.url, + }); + + const parsedUrl = new URL(this.config.url); + const database = parsedUrl.pathname.replace(/^\//, ""); + try { + await this.migrate(database); + } catch (err) { + this.logger.error({ event: "clickhouse_migration_failed", database, error: String(err) }); + throw err; + } + + this.flushTimer = setInterval(() => { + this.flush().catch((err) => { + this.logger.error({ event: "flush_interval_error", error: String(err) }); + }); + }, this.config.flushIntervalMs); + + this.logger.log({ + event: "clickhouse_initialized", + host: parsedUrl.host, + database, + batchSize: this.config.batchSize, + flushIntervalMs: this.config.flushIntervalMs, + probeLocation: this.configService.get("app").probeLocation, + }); + } + + private async migrate(database: string): Promise { + if (!this.client) return; + const migrations = buildMigrations(database); + for (const sql of migrations) { + await this.client.command({ query: sql }); + } + this.logger.log({ event: "clickhouse_migrated", database }); + } + + async onApplicationShutdown() { + if (this.flushTimer) { + clearInterval(this.flushTimer); + this.flushTimer = null; + } + await this.flush(); + await this.client?.close(); + } + + /** + * Queue a row for insertion. Returns immediately; the flush happens in the background. + * Safe to call when ClickHouse is disabled: rows are silently dropped. + */ + insert(table: string, row: Record): void { + if (!this.client) return; + + if (this.buffer.length >= this.config.maxBufferSize) { + this.buffer.shift(); + this.droppedRows.inc({ reason: "buffer_full" }); + } + + this.buffer.push({ table, row }); + this.bufferRows.set(this.buffer.length); + + if (this.buffer.length >= this.config.batchSize) { + this.flush().catch((err) => { + this.logger.error({ event: "flush_batch_error", error: String(err) }); + }); + } + } + + private async flush(): Promise { + if (!this.client || this.buffer.length === 0) return; + + const n = this.buffer.length; + const batch = this.buffer.slice(0, n); + + // Group by table so we can do one insert call per table + const byTable = new Map[]>(); + for (const { table, row } of batch) { + let rows = byTable.get(table); + if (!rows) { + rows = []; + byTable.set(table, rows); + } + rows.push(row); + } + + const end = this.flushDuration.startTimer(); + try { + await Promise.all( + Array.from(byTable.entries()).map(async ([table, rows]) => { + await this.client!.insert({ + table, + values: rows, + format: "JSONEachRow", + }); + this.rowsInserted.inc({ table }, rows.length); + }), + ); + this.buffer.splice(0, n); + this.bufferRows.set(this.buffer.length); + } catch (err) { + this.flushErrors.inc(); + this.logger.error({ + event: "flush_failed", + error: String(err), + pendingRows: n, + }); + } finally { + end(); + } + } + + get probeLocation(): string { + return this.configService.get("app").probeLocation; + } + + get enabled(): boolean { + return this.client !== null; + } +} diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index a8010bab..b3b32a37 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -122,6 +122,13 @@ export const configValidationSchema = Joi.object({ DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH), RANDOM_PIECE_SIZES: Joi.string().default("10485760"), // 10 MiB + // ClickHouse + CLICKHOUSE_URL: Joi.string().uri().optional(), + CLICKHOUSE_BATCH_SIZE: Joi.number().integer().min(1).default(500), + CLICKHOUSE_FLUSH_INTERVAL_MS: Joi.number().integer().min(100).default(5000), + CLICKHOUSE_MAX_BUFFER_SIZE: Joi.number().integer().min(1).default(5000), + DEALBOT_PROBE_LOCATION: Joi.string().default("unknown"), + // Timeouts (in milliseconds) CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead) @@ -144,6 +151,7 @@ export interface IAppConfig { enableDevMode: boolean; prometheusWalletBalanceTtlSeconds: number; prometheusWalletBalanceErrorCooldownSeconds: number; + probeLocation: string; } export interface IDatabaseConfig { @@ -301,6 +309,18 @@ export interface ISpBlocklistConfig { addresses: Set; } +export interface IClickhouseConfig { + /** + * ClickHouse connection URL. Must include the database in the path. + * Example: http://default:password@host:8123/dealbot + * If unset, ClickHouse emission is disabled. + */ + url: string | undefined; + batchSize: number; + flushIntervalMs: number; + maxBufferSize: number; +} + export interface IConfig { app: IAppConfig; database: IDatabaseConfig; @@ -310,6 +330,7 @@ export interface IConfig { dataset: IDatasetConfig; timeouts: ITimeoutConfig; retrieval: IRetrievalConfig; + clickhouse: IClickhouseConfig; pieceCleanup: IPieceCleanupConfig; spBlocklists: ISpBlocklistConfig; } @@ -337,6 +358,7 @@ export function loadConfig(): IConfig { process.env.PROMETHEUS_WALLET_BALANCE_ERROR_COOLDOWN_SECONDS || "60", 10, ), + probeLocation: process.env.DEALBOT_PROBE_LOCATION || "unknown", }, database: { host: process.env.DATABASE_HOST || "localhost", @@ -413,6 +435,12 @@ export function loadConfig(): IConfig { retrieval: { ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10), }, + clickhouse: { + url: process.env.CLICKHOUSE_URL || undefined, + batchSize: Number.parseInt(process.env.CLICKHOUSE_BATCH_SIZE || "500", 10), + flushIntervalMs: Number.parseInt(process.env.CLICKHOUSE_FLUSH_INTERVAL_MS || "5000", 10), + maxBufferSize: Number.parseInt(process.env.CLICKHOUSE_MAX_BUFFER_SIZE || "5000", 10), + }, pieceCleanup: { maxDatasetStorageSizeBytes: Number.parseInt( process.env.MAX_DATASET_STORAGE_SIZE_BYTES || String(24 * 1024 * 1024 * 1024), diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 17151bd1..daf8733d 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -2,6 +2,7 @@ import type { ConfigService } from "@nestjs/config"; import type { Counter, Gauge } from "prom-client"; import { Repository } from "typeorm"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import type { IConfig } from "../config/app.config.js"; import type { DataRetentionBaseline } from "../database/entities/data-retention-baseline.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -121,6 +122,7 @@ describe("DataRetentionService", () => { delete: vi.fn().mockResolvedValue(undefined), }; mockSPRepository = { find: vi.fn() }; + const clickhouseServiceMock = { insert: vi.fn(), probeLocation: "test" } as unknown as ClickhouseService; service = new DataRetentionService( configServiceMock, walletSdkServiceMock as unknown as WalletSdkService, @@ -129,6 +131,7 @@ describe("DataRetentionService", () => { mockSPRepository as unknown as Repository, counterMock as unknown as Counter, gaugeMock as unknown as Gauge, + clickhouseServiceMock, ); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index f4d7ec6d..2922ab99 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -4,6 +4,7 @@ import { InjectRepository } from "@nestjs/typeorm"; import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { Counter, Gauge } from "prom-client"; import { Raw, Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { toStructuredError } from "../common/logging.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; import { IConfig } from "../config/app.config.js"; @@ -53,6 +54,7 @@ export class DataRetentionService { private readonly dataSetChallengeStatusCounter: Counter, @InjectMetric("pdp_provider_estimated_overdue_periods") private readonly estimatedOverduePeriodsGauge: Gauge, + private readonly clickhouseService: ClickhouseService, ) { this.providerCumulativeTotals = new Map(); } @@ -332,6 +334,18 @@ export class DataRetentionService { const confirmedTotalSuccess = totalProvingPeriods - totalFaultedPeriods; + this.clickhouseService.insert("data_retention_challenges", { + timestamp: Date.now(), + probe_location: this.clickhouseService.probeLocation, + sp_address: address, + sp_id: pdpProvider.id != null ? String(pdpProvider.id) : null, // pdpProvider.id is a BigInt + sp_name: pdpProvider.name ?? null, + total_periods_due: Number(totalProvingPeriods), + total_faulted_periods: Number(totalFaultedPeriods), + total_success_periods: Number(confirmedTotalSuccess), + estimated_overdue_periods: Number(estimatedOverduePeriods), + }); + const normalizedAddress = address.toLowerCase(); const previous = this.providerCumulativeTotals.get(normalizedAddress); diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 77ad2eae..8980057c 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -6,6 +6,7 @@ import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; import { generatePrivateKey } from "viem/accounts"; import { afterEach, beforeEach, describe, expect, it, Mock, vi } from "vitest"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus } from "../database/types.js"; @@ -165,6 +166,7 @@ describe("DealService", () => { { provide: DataStorageCheckMetrics, useValue: mockDataStorageMetrics }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); @@ -978,6 +980,7 @@ describe("DealService", () => { { provide: DataStorageCheckMetrics, useValue: mockDataStorageMetrics }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index ee1738d9..c5906797 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -6,6 +6,7 @@ import { InjectRepository } from "@nestjs/typeorm"; import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; import type { Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { awaitWithAbort } from "../common/abort-utils.js"; import { buildUnixfsCar } from "../common/car-utils.js"; import { createFilecoinPinLogger } from "../common/filecoin-pin-logger.js"; @@ -65,6 +66,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { private readonly dataStorageMetrics: DataStorageCheckMetrics, private readonly retrievalMetrics: RetrievalCheckMetrics, private readonly dataSetCreationMetrics: DataSetCreationCheckMetrics, + private readonly clickhouseService: ClickhouseService, ) { this.blockchainConfig = this.configService.get("blockchain"); } @@ -799,6 +801,31 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } private async saveDeal(deal: Deal, dealLogContext: DealLogContext): Promise { + this.clickhouseService.insert("data_storage_checks", { + timestamp: Date.now(), + probe_location: this.clickhouseService.probeLocation, + sp_address: deal.spAddress, + sp_id: deal.storageProvider?.providerId != null ? String(deal.storageProvider.providerId) : null, // providerId is a BigInt + sp_name: deal.storageProvider?.name ?? null, + deal_id: deal.id, + piece_cid: deal.pieceCid ?? null, + piece_id: deal.pieceId ?? null, + file_size_bytes: deal.fileSize ?? null, + piece_size_bytes: deal.pieceSize ?? null, + status: deal.status, + error_code: deal.errorCode ?? null, + upload_started_at: deal.uploadStartTime?.getTime() ?? null, + upload_ended_at: deal.uploadEndTime?.getTime() ?? null, + pieces_added_at: deal.piecesAddedTime?.getTime() ?? null, + pieces_confirmed_at: deal.piecesConfirmedTime?.getTime() ?? null, + ipni_status: deal.ipniStatus ?? null, + ipni_indexed_at: deal.ipniIndexedAt?.getTime() ?? null, + ipni_advertised_at: deal.ipniAdvertisedAt?.getTime() ?? null, + ipni_verified_at: deal.ipniVerifiedAt?.getTime() ?? null, + ipni_verified_cids_count: deal.ipniVerifiedCidsCount ?? null, + ipni_unverified_cids_count: deal.ipniUnverifiedCidsCount ?? null, + }); + try { await this.dealRepository.save(deal); } catch (error) { diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index 6e003212..232ae4e5 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -2,6 +2,7 @@ import { ConfigService } from "@nestjs/config"; import { Test, TestingModule } from "@nestjs/testing"; import { getRepositoryToken } from "@nestjs/typeorm"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -32,6 +33,7 @@ describe("RetrievalService timeouts", () => { get: vi.fn((key: string) => { if (key === "app") return { runMode: "api" }; if (key === "jobs") return { pgbossSchedulerEnabled: false }; + if (key === "blockchain") return { network: "calibration" }; if (key === "dataset") return { randomDatasetSizes: [10] }; if (key === "timeouts") return { ipniVerificationTimeoutMs: 10_000, ipniVerificationPollingMs: 2_000 }; return undefined; @@ -93,6 +95,7 @@ describe("RetrievalService timeouts", () => { { provide: DiscoverabilityCheckMetrics, useValue: mockDiscoverabilityMetrics }, { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); @@ -361,6 +364,7 @@ describe("RetrievalService DB/provider drift", () => { { provide: DiscoverabilityCheckMetrics, useValue: {} }, { provide: IpniVerificationService, useValue: {} }, { provide: ConfigService, useValue: mockConfigService }, + { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, ], }).compile(); return module.get(RetrievalService); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 1bf2d517..aff8849d 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -3,6 +3,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import { CID } from "multiformats/cid"; import type { Repository } from "typeorm"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { type ProviderJobContext, type RetrievalLogContext, toStructuredError } from "../common/logging.js"; import type { Hex } from "../common/types.js"; import type { IConfig } from "../config/app.config.js"; @@ -40,6 +41,7 @@ export class RetrievalService { private readonly discoverabilityMetrics: DiscoverabilityCheckMetrics, private readonly ipniVerificationService: IpniVerificationService, private readonly configService: ConfigService, + private readonly clickhouseService: ClickhouseService, ) {} async performRandomRetrievalForProvider( @@ -255,7 +257,25 @@ export class RetrievalService { this.retrievalMetrics.recordHttpResponseCode(providerLabels, executionResult.metrics.statusCode); } - return this.saveRetrieval(retrieval); + const saved = await this.saveRetrieval(retrieval); + + this.clickhouseService.insert("retrieval_checks", { + timestamp: Date.now(), + probe_location: this.clickhouseService.probeLocation, + sp_address: deal.spAddress, + sp_id: providerLabels.providerId !== "unknown" ? providerLabels.providerId : null, + sp_name: providerLabels.providerName !== "unknown" ? providerLabels.providerName : null, + deal_id: deal.id, + retrieval_id: saved.id, + service_type: saved.serviceType, + status: saved.status, + http_response_code: executionResult.metrics.statusCode || null, + first_byte_ms: executionResult.success ? executionResult.metrics.ttfb : null, + last_byte_ms: executionResult.success ? executionResult.metrics.latency : null, + bytes_retrieved: executionResult.success ? executionResult.metrics.responseSize : null, + }); + + return saved; } private recordRetrievalEventMetrics( diff --git a/apps/backend/src/worker.module.ts b/apps/backend/src/worker.module.ts index 886e1db3..67bbd5da 100644 --- a/apps/backend/src/worker.module.ts +++ b/apps/backend/src/worker.module.ts @@ -1,6 +1,7 @@ import { Module } from "@nestjs/common"; import { ConfigModule } from "@nestjs/config"; import { LoggerModule } from "nestjs-pino"; +import { ClickhouseModule } from "./clickhouse/clickhouse.module.js"; import { buildLoggerModuleParams } from "./common/pino.config.js"; import { configValidationSchema, loadConfig } from "./config/app.config.js"; import { DatabaseModule } from "./database/database.module.js"; @@ -17,6 +18,7 @@ import { MetricsPrometheusModule } from "./metrics-prometheus/metrics-prometheus }), DatabaseModule, MetricsPrometheusModule, + ClickhouseModule, JobsModule, ], }) diff --git a/docs/architecture.md b/docs/architecture.md index 55bea880..0855a6f9 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -37,6 +37,12 @@ Postgres is the system-of-record for Dealbot state: - Retrieval lifecycle records in `retrievals`. - Scheduler state in `job_schedule_state` and queue execution state in `pgboss.job`. +ClickHouse is for long-term check result storage and analysis, it's optional: + +- Each completed check writes one row to the relevant table: `data_storage_checks`, `retrieval_checks`, or `data_retention_challenges`. +- Rows are buffered in memory and flushed in batches; failed flushes are logged and the batch is dropped (ClickHouse is a metrics sink, not a source of truth). +- ClickHouse is not a dependency for normal operation. The service starts and runs without it. + Prometheus is for runtime observability, not durable state: - Job health/performance metrics (for example `jobs_started_total`, `jobs_completed_total`, `jobs_queued`) are emitted at runtime on `/metrics`. diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index 8ededa34..1fc0f2fe 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -106,3 +106,19 @@ sequenceDiagram | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Not tied to an [event above](#event-list) but rather to the periodic chain-checking done in the [Data Retention Check](./data-retention.md) | `success`, `failure` | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pdp_provider_overdue_periods` | Data Retention | Emitted on every poll | Gauge value (estimated overdue periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | + +## ClickHouse Tables + +When `CLICKHOUSE_URL` is configured, dealbot writes one row per check result to ClickHouse for long-term storage and analysis. All tables are partitioned by month with a 1-year TTL. + +> **Source of truth**: the DDL and column-level comments in [`clickhouse.schema.ts`](../../apps/backend/src/clickhouse/clickhouse.schema.ts) are authoritative. The summary below is for orientation only. + +- **`data_storage_checks`** — one row written each time a deal is saved (on every status transition). Populated by [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts). +- **`retrieval_checks`** — one row per retrieval attempt. Populated by [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts). +- **`data_retention_challenges`** — one row per provider per poll cycle. Populated by [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts). + +All tables share the primary key `(probe_location, sp_address, timestamp)`: + +- `probe_location` - identifies which dealbot instance produced the row, allowing multiple deployments to be distinguished in queries (set via `DEALBOT_PROBE_LOCATION`) +- `sp_address` - the Ethereum/FEVM address of the storage provider under test +- `timestamp` - when the row was written (milliseconds UTC) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 9aff8c1e..359d86da 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -13,6 +13,7 @@ This document provides a comprehensive guide to all environment variables used b | [Scheduling](#scheduling-configuration) | `PROVIDERS_REFRESH_INTERVAL_SECONDS`, `DATA_RETENTION_POLL_INTERVAL_SECONDS`, `DEALBOT_MAINTENANCE_WINDOWS_UTC`, `DEALBOT_MAINTENANCE_WINDOW_MINUTES` | | [Jobs (pg-boss)](#jobs-pg-boss) | `DEALBOT_PGBOSS_SCHEDULER_ENABLED`, `DEALBOT_PGBOSS_POOL_MAX`, `DEALS_PER_SP_PER_HOUR`, `DATASET_CREATIONS_PER_SP_PER_HOUR`, `RETRIEVALS_PER_SP_PER_HOUR`, `JOB_SCHEDULER_POLL_SECONDS`, `JOB_WORKER_POLL_SECONDS`, `PG_BOSS_LOCAL_CONCURRENCY`, `JOB_CATCHUP_MAX_ENQUEUE`, `JOB_SCHEDULE_PHASE_SECONDS`, `JOB_ENQUEUE_JITTER_SECONDS`, `DEAL_JOB_TIMEOUT_SECONDS`, `RETRIEVAL_JOB_TIMEOUT_SECONDS`, `IPFS_BLOCK_FETCH_CONCURRENCY` | | [Dataset](#dataset-configuration) | `DEALBOT_LOCAL_DATASETS_PATH`, `RANDOM_PIECE_SIZES` | +| [ClickHouse](#clickhouse-configuration) | `CLICKHOUSE_URL`, `CLICKHOUSE_BATCH_SIZE`, `CLICKHOUSE_FLUSH_INTERVAL_MS`, `DEALBOT_PROBE_LOCATION` | | [Timeouts](#timeout-configuration) | `CONNECT_TIMEOUT_MS`, `HTTP_REQUEST_TIMEOUT_MS`, `HTTP2_REQUEST_TIMEOUT_MS`, `IPNI_VERIFICATION_TIMEOUT_MS`, `IPNI_VERIFICATION_POLLING_MS` | | [Piece Cleanup](#piece-cleanup) | `MAX_DATASET_STORAGE_SIZE_BYTES`, `TARGET_DATASET_STORAGE_SIZE_BYTES`, `JOB_PIECE_CLEANUP_PER_SP_PER_HOUR`, `MAX_PIECE_CLEANUP_RUNTIME_SECONDS` | | [SP Blocklist](#sp-blocklist-configuration) | `BLOCKED_SP_IDS`, `BLOCKED_SP_ADDRESSES` | @@ -946,6 +947,78 @@ RANDOM_PIECE_SIZES=1024,10240,102400 --- +## ClickHouse Configuration + +Dealbot optionally writes check results to ClickHouse for long-term storage and analysis. All ClickHouse writes are disabled when `CLICKHOUSE_URL` is unset. + +### `CLICKHOUSE_URL` + +- **Type**: `string` (HTTP/HTTPS URL) +- **Required**: No +- **Default**: Not set (ClickHouse writes disabled) + +**Role**: ClickHouse connection URL. Must include the database name in the path. When unset, all ClickHouse inserts are silently dropped and no connection is made. + +**Example**: + +```bash +CLICKHOUSE_URL=http://default:password@clickhouse-host:8123/dealbot +``` + +--- + +### `CLICKHOUSE_BATCH_SIZE` + +- **Type**: `number` +- **Required**: No +- **Default**: `500` +- **Minimum**: `1` + +**Role**: Maximum number of rows to accumulate in the in-memory buffer before triggering a flush to ClickHouse. Rows are also flushed on the interval defined by `CLICKHOUSE_FLUSH_INTERVAL_MS`. + +**When to update**: + +- Decrease for lower-throughput deployments where you want more frequent writes +- Increase to reduce write frequency under high load + +--- + +### `CLICKHOUSE_FLUSH_INTERVAL_MS` + +- **Type**: `number` (milliseconds) +- **Required**: No +- **Default**: `5000` (5 seconds) +- **Minimum**: `100` + +**Role**: How often the ClickHouse buffer is flushed, regardless of batch size. + +**When to update**: + +- Decrease for more real-time data visibility +- Increase to reduce write pressure on the ClickHouse server + +--- + +### `DEALBOT_PROBE_LOCATION` + +- **Type**: `string` +- **Required**: No +- **Default**: `unknown` + +**Role**: A label identifying where this dealbot instance is running (e.g. `aws-us-east-1`, `local`). Written to ClickHouse as `probe_location` on every row, allowing multi-region deployments to be distinguished in queries. + +**When to update**: + +- Set to a meaningful geographic or deployment identifier for each dealbot instance + +**Example**: + +```bash +DEALBOT_PROBE_LOCATION=aws-us-east-1 +``` + +--- + ## Timeout Configuration ### `CONNECT_TIMEOUT_MS` diff --git a/kustomize/overlays/local/backend-configmap-local.yaml b/kustomize/overlays/local/backend-configmap-local.yaml index 704d444f..9226d24e 100644 --- a/kustomize/overlays/local/backend-configmap-local.yaml +++ b/kustomize/overlays/local/backend-configmap-local.yaml @@ -28,3 +28,5 @@ data: RANDOM_PIECE_SIZES: "10485760" PDP_SUBGRAPH_ENDPOINT: "https://api.goldsky.com/api/public/project_cmdfaaxeuz6us01u359yjdctw/subgraphs/pdp-explorer/calibration311a/gn" JOB_SCHEDULER_POLL_SECONDS: "60" + CLICKHOUSE_URL: "http://default:@dealbot-clickhouse:8123/dealbot" + DEALBOT_PROBE_LOCATION: "local" diff --git a/kustomize/overlays/local/backend-resources-local.yaml b/kustomize/overlays/local/backend-resources-local.yaml new file mode 100644 index 00000000..8ea7a65b --- /dev/null +++ b/kustomize/overlays/local/backend-resources-local.yaml @@ -0,0 +1,16 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dealbot +spec: + template: + spec: + containers: + - name: dealbot + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: 100m + memory: 256Mi diff --git a/kustomize/overlays/local/clickhouse/deployment.yaml b/kustomize/overlays/local/clickhouse/deployment.yaml new file mode 100644 index 00000000..b3a353e3 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/deployment.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: dealbot-clickhouse + template: + metadata: + labels: + app.kubernetes.io/name: dealbot-clickhouse + spec: + containers: + - name: clickhouse + image: clickhouse/clickhouse-server:24-alpine + ports: + - name: http + containerPort: 8123 + protocol: TCP + - name: native + containerPort: 9000 + protocol: TCP + startupProbe: + httpGet: + path: /ping + port: http + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 30 + readinessProbe: + httpGet: + path: /ping + port: http + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 6 + livenessProbe: + httpGet: + path: /ping + port: http + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 6 + volumeMounts: + - name: clickhouse-storage + mountPath: /var/lib/clickhouse + - name: initdb + mountPath: /docker-entrypoint-initdb.d + volumes: + - name: clickhouse-storage + persistentVolumeClaim: + claimName: dealbot-clickhouse + - name: initdb + configMap: + name: dealbot-clickhouse-initdb diff --git a/kustomize/overlays/local/clickhouse/initdb-configmap.yaml b/kustomize/overlays/local/clickhouse/initdb-configmap.yaml new file mode 100644 index 00000000..3bd07310 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/initdb-configmap.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: dealbot-clickhouse-initdb +data: + 00-create-database.sql: | + CREATE DATABASE IF NOT EXISTS dealbot; diff --git a/kustomize/overlays/local/clickhouse/kustomization.yaml b/kustomize/overlays/local/clickhouse/kustomization.yaml new file mode 100644 index 00000000..48ea3f84 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/kustomization.yaml @@ -0,0 +1,8 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: + - deployment.yaml + - service.yaml + - pvc.yaml + - initdb-configmap.yaml diff --git a/kustomize/overlays/local/clickhouse/pvc.yaml b/kustomize/overlays/local/clickhouse/pvc.yaml new file mode 100644 index 00000000..b24ce359 --- /dev/null +++ b/kustomize/overlays/local/clickhouse/pvc.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi diff --git a/kustomize/overlays/local/clickhouse/service.yaml b/kustomize/overlays/local/clickhouse/service.yaml new file mode 100644 index 00000000..f697ed8d --- /dev/null +++ b/kustomize/overlays/local/clickhouse/service.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: dealbot-clickhouse + labels: + app.kubernetes.io/name: dealbot-clickhouse +spec: + type: ClusterIP + ports: + - port: 8123 + targetPort: 8123 + protocol: TCP + name: http + - port: 9000 + targetPort: 9000 + protocol: TCP + name: native + selector: + app.kubernetes.io/name: dealbot-clickhouse diff --git a/kustomize/overlays/local/kustomization.yaml b/kustomize/overlays/local/kustomization.yaml index 1a413626..dd3f0e01 100644 --- a/kustomize/overlays/local/kustomization.yaml +++ b/kustomize/overlays/local/kustomization.yaml @@ -7,6 +7,7 @@ resources: - ../../base/backend - ../../base/web - ./postgres + - ./clickhouse - dealbot-worker-deployment.yaml - dealbot-worker-service.yaml diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 617eed44..a5bd57a6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: apps/backend: dependencies: + '@clickhouse/client': + specifier: ^1.11.0 + version: 1.18.2 '@filoz/synapse-core': specifier: 0.3.3 version: 0.3.3(typescript@5.9.3)(viem@2.47.5(typescript@5.9.3)(zod@4.3.6)) @@ -601,6 +604,13 @@ packages: '@clack/prompts@1.1.0': resolution: {integrity: sha512-pkqbPGtohJAvm4Dphs2M8xE29ggupihHdy1x84HNojZuMtFsHiUlRvqD24tM2+XmI+61LlfNceM3Wr7U5QES5g==} + '@clickhouse/client-common@1.18.2': + resolution: {integrity: sha512-J0SG6q9V31ydxonglpj9xhNRsUxCsF71iEZ784yldqMYwsHixj/9xHFDgBDX3DuMiDx/kPDfXnf+pimp08wIBA==} + + '@clickhouse/client@1.18.2': + resolution: {integrity: sha512-fuquQswRSHWM6D079ZeuGqkMOsqtcUPL06UdTnowmoeeYjVrqisfVmvnw8pc3OeKS4kVb91oygb/MfLDiMs0TQ==} + engines: {node: '>=16'} + '@colors/colors@1.5.0': resolution: {integrity: sha512-ooWCrlZP11i8GImSjTHYHLkvFDP48nS4+204nGb1RiX/WXYHmJA2III9/e2DWVabCESdW7hBAEzHRqUn9OUVvQ==} engines: {node: '>=0.1.90'} @@ -6458,6 +6468,12 @@ snapshots: '@clack/core': 1.1.0 sisteransi: 1.0.5 + '@clickhouse/client-common@1.18.2': {} + + '@clickhouse/client@1.18.2': + dependencies: + '@clickhouse/client-common': 1.18.2 + '@colors/colors@1.5.0': optional: true