diff --git a/apps/backend/src/clickhouse/clickhouse.schema.ts b/apps/backend/src/clickhouse/clickhouse.schema.ts index 85d91052..7afbab57 100644 --- a/apps/backend/src/clickhouse/clickhouse.schema.ts +++ b/apps/backend/src/clickhouse/clickhouse.schema.ts @@ -78,5 +78,28 @@ export function buildMigrations(database: string): string[] { PRIMARY KEY (probe_location, sp_address, timestamp) PARTITION BY toStartOfMonth(timestamp) TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, + + `CREATE TABLE IF NOT EXISTS ${database}.pull_checks +( + timestamp DateTime64(3, 'UTC'), -- when the pull check terminated + probe_location LowCardinality(String), -- dealbot location + sp_address String, -- storage provider address + sp_id Nullable(UInt64), -- storage provider numeric id + sp_name Nullable(String), -- storage provider name + + piece_cid Nullable(String), -- piece CID of the synthetic test piece; null if preparation failed + piece_size_bytes Nullable(UInt64), -- size of the synthetic piece in bytes; null if preparation failed + + status LowCardinality(String), -- 'success' | 'failure.timedout' | 'failure.other' + provider_status LowCardinality(Nullable(String)), -- raw SP-reported terminal pull status (e.g. 'complete', 'failed'); null if the request was never acknowledged or if waiting for pull status errored or timed out + + acknowledgement_latency_ms Nullable(Float64), -- time from pullPieces submission to SP acknowledgement (ms) + completion_latency_ms Nullable(Float64), -- time from pullPieces submission to terminal SP pull status (ms) + first_byte_ms Nullable(Float64), -- time from pullPieces submission to SP reading first byte of hosted piece (ms); null when check failed before first byte + throughput_bps Nullable(Float64) -- approx bytes/sec = piece_size_bytes / completion_latency_ms * 1000; null on failure +) ENGINE MergeTree() + PRIMARY KEY (probe_location, sp_address, timestamp) + PARTITION BY toStartOfMonth(timestamp) + TTL toDateTime(timestamp) + INTERVAL 1 YEAR`, ]; } diff --git a/apps/backend/src/pull-check/pull-check.service.spec.ts b/apps/backend/src/pull-check/pull-check.service.spec.ts index 86bf4e0c..1e8e4999 100644 --- a/apps/backend/src/pull-check/pull-check.service.spec.ts +++ b/apps/backend/src/pull-check/pull-check.service.spec.ts @@ -2,6 +2,7 @@ import { Readable } from "node:stream"; import { ConfigService } from "@nestjs/config"; import { Test, type TestingModule } from "@nestjs/testing"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import type { IConfig } from "../config/app.config.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; import { HttpClientService } from "../http-client/http-client.service.js"; @@ -66,6 +67,7 @@ describe("PullCheckService", () => { observeThroughputBps: ReturnType; recordStatus: ReturnType; }; + let clickhouseServiceMock: { insert: ReturnType; probeLocation: string }; let configValues: Partial; beforeEach(async () => { @@ -96,6 +98,7 @@ describe("PullCheckService", () => { observeThroughputBps: vi.fn(), recordStatus: vi.fn(), }; + clickhouseServiceMock = { insert: vi.fn(), probeLocation: "test" }; configValues = { app: { host: "localhost", port: 3000, apiPublicUrl: "https://dealbot.example" } as IConfig["app"], @@ -125,6 +128,7 @@ describe("PullCheckService", () => { { provide: PullPieceRepository, useValue: registryMock }, { provide: PullCheckCheckMetrics, useValue: metricsMock }, { provide: HttpClientService, useValue: httpClientServiceMock }, + { provide: ClickhouseService, useValue: clickhouseServiceMock }, ], }).compile(); @@ -332,6 +336,18 @@ describe("PullCheckService", () => { // Pieces expire via TTL rather than being deleted at job end. expect(registryMock.forget).not.toHaveBeenCalled(); + // ClickHouse row written with the check result. + expect(clickhouseServiceMock.insert).toHaveBeenCalledWith( + "pull_checks", + expect.objectContaining({ + probe_location: "test", + sp_address: "0xsp", + piece_cid: "bafk-test-piece", + piece_size_bytes: 1024, + status: "success", + provider_status: "complete", + }), + ); }); it("does not observe firstByte when the SP never read from /api/piece (cached pull)", async () => { @@ -360,6 +376,15 @@ describe("PullCheckService", () => { expect(metricsMock.recordProviderStatus).toHaveBeenCalledWith(expect.any(Object), "failed"); expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.other"); expect(registryMock.forget).not.toHaveBeenCalled(); + // ClickHouse row written with the failure outcome. + expect(clickhouseServiceMock.insert).toHaveBeenCalledWith( + "pull_checks", + expect.objectContaining({ + sp_address: "0xsp", + status: "failure.other", + provider_status: "failed", + }), + ); }); it("classifies timeouts as failure.timedout", async () => { @@ -368,6 +393,13 @@ describe("PullCheckService", () => { await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow(); expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.timedout"); + expect(clickhouseServiceMock.insert).toHaveBeenCalledWith( + "pull_checks", + expect.objectContaining({ + sp_address: "0xsp", + status: "failure.timedout", + }), + ); }); it("re-throws and runs cleanup when the validation step fails", async () => { @@ -405,6 +437,26 @@ describe("PullCheckService", () => { await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow(/Synapse client unavailable/); expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.other"); }); + + it("writes a ClickHouse row with null sp fields when the provider is unknown", async () => { + walletSdkServiceMock.getProviderInfo.mockReturnValue(undefined); + + await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow(/not found/); + // No metrics recorded (labels could not be built). + expect(metricsMock.recordStatus).not.toHaveBeenCalled(); + // ClickHouse row still written: sp_address is always available, + // sp_id and sp_name are null since providerInfo was never resolved. + expect(clickhouseServiceMock.insert).toHaveBeenCalledWith( + "pull_checks", + expect.objectContaining({ + sp_address: "0xsp", + sp_id: null, + sp_name: null, + piece_cid: null, + status: "failure.other", + }), + ); + }); }); describe("deleteExpiredPullPieces", () => { diff --git a/apps/backend/src/pull-check/pull-check.service.ts b/apps/backend/src/pull-check/pull-check.service.ts index ff76a264..6e77b1b7 100644 --- a/apps/backend/src/pull-check/pull-check.service.ts +++ b/apps/backend/src/pull-check/pull-check.service.ts @@ -4,6 +4,7 @@ import { pullPieces, waitForPullPieces } from "@filoz/synapse-core/sp"; import { Injectable, Logger } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import type { Address } from "viem"; +import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { type ProviderJobContext, toStructuredError } from "../common/logging.js"; import type { IAppConfig, IConfig, IPullPieceConfig } from "../config/app.config.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; @@ -26,6 +27,7 @@ export class PullCheckService { private readonly pullPieceRepository: PullPieceRepository, private readonly pullCheckMetrics: PullCheckCheckMetrics, private readonly httpClientService: HttpClientService, + private readonly clickhouseService: ClickhouseService, ) {} /** @@ -65,18 +67,26 @@ export class PullCheckService { signal: AbortSignal | undefined, logContext: ProviderJobContext, ): Promise { - const providerInfo = this.validateProviderInfo(spAddress); - const labels = buildCheckMetricLabels({ - checkType: "pullCheck", - providerId: providerInfo.id, - providerName: providerInfo.name, - providerIsApproved: providerInfo.isApproved, - }); + let providerInfo: PDPProviderEx | null = null; + let labels: ReturnType | null = null; let prepared: PullPiecePrepared | null = null; let requestSubmittedAt: Date | null = null; + let requestLatencyMs: number | null = null; + let completionLatencyMs: number | null = null; + let firstByteMs: number | null = null; + let throughputBps: number | null = null; + let finalProviderStatus: string | null = null; + let checkStatus: string | null = null; try { + providerInfo = this.validateProviderInfo(spAddress); + labels = buildCheckMetricLabels({ + checkType: "pullCheck", + providerId: providerInfo.id, + providerName: providerInfo.name, + providerIsApproved: providerInfo.isApproved, + }); signal?.throwIfAborted(); prepared = await this.preparePullPiece(spAddress); const pieceCidStr = prepared.registration.pieceCid; @@ -101,7 +111,7 @@ export class PullCheckService { await this.pullPieceRepository.markPullSubmitted(pieceCidStr, requestSubmittedAt); const pullResponse = await pullPieces(synapseClient, pullPiecesOptions); signal?.throwIfAborted(); - const requestLatencyMs = Date.now() - requestSubmittedAt.getTime(); + requestLatencyMs = Date.now() - requestSubmittedAt.getTime(); this.pullCheckMetrics.observeAcknowledgementLatencyMs(labels, requestLatencyMs); this.logger.log({ ...logContext, @@ -120,10 +130,11 @@ export class PullCheckService { pollInterval: pullPieceConfig.pullCheckPollIntervalSeconds * 1000, }); signal?.throwIfAborted(); - const completionLatencyMs = Date.now() - requestSubmittedAt.getTime(); + completionLatencyMs = Date.now() - requestSubmittedAt.getTime(); this.pullCheckMetrics.observeCompletionLatencyMs(labels, completionLatencyMs); // Record the SP-reported terminal pull status (one increment per check) - this.pullCheckMetrics.recordProviderStatus(labels, finalResponse.status); + finalProviderStatus = finalResponse.status; + this.pullCheckMetrics.recordProviderStatus(labels, finalProviderStatus); if (finalResponse.status !== "complete") { throw new Error(`Storage provider failed to pull piece: status=${finalResponse.status}`); @@ -142,7 +153,7 @@ export class PullCheckService { } const firstByteEntry = await this.pullPieceRepository.resolve(pieceCidStr); - const firstByteMs = + firstByteMs = firstByteEntry?.firstByteAt && firstByteEntry?.pullSubmittedAt ? firstByteEntry.firstByteAt.getTime() - firstByteEntry.pullSubmittedAt.getTime() : null; @@ -152,10 +163,11 @@ export class PullCheckService { // Throughput approximated as pieceSize / completionLatency. This is an // upper-bound on actual transfer time because completionLatency includes // SP-side scheduling/queuing and our polling cadence. - const throughputBps = Math.round((prepared.registration.size * 1000) / Math.max(completionLatencyMs, 1)); + throughputBps = Math.round((prepared.registration.size * 1000) / Math.max(completionLatencyMs ?? 1, 1)); this.pullCheckMetrics.observeThroughputBps(labels, throughputBps); - this.pullCheckMetrics.recordStatus(labels, "success"); + checkStatus = "success"; + this.pullCheckMetrics.recordStatus(labels, checkStatus); this.logger.log({ ...logContext, event: "pull_check_completed", @@ -168,8 +180,27 @@ export class PullCheckService { pieceSizeBytes: prepared.registration.size, }); } catch (error) { - this.pullCheckMetrics.recordStatus(labels, classifyFailureStatus(error)); + checkStatus = classifyFailureStatus(error); + if (labels !== null) this.pullCheckMetrics.recordStatus(labels, checkStatus); throw error; + } finally { + if (checkStatus !== null) { + this.clickhouseService.insert("pull_checks", { + timestamp: Date.now(), + probe_location: this.clickhouseService.probeLocation, + sp_address: spAddress, + sp_id: providerInfo?.id != null ? String(providerInfo.id) : null, + sp_name: providerInfo?.name ?? null, + piece_cid: prepared?.registration.pieceCid ?? null, + piece_size_bytes: prepared?.registration.size ?? null, + status: checkStatus, + provider_status: finalProviderStatus, + acknowledgement_latency_ms: requestLatencyMs, + completion_latency_ms: completionLatencyMs, + first_byte_ms: firstByteMs, + throughput_bps: throughputBps, + }); + } } // Pieces are not eagerly deleted here; they remain active (200) until their // TTL expires so that SPs polling after job end are not spuriously told 404. diff --git a/docs/checks/pull-check.md b/docs/checks/pull-check.md index 0dfbfdb7..7b04b1ce 100644 --- a/docs/checks/pull-check.md +++ b/docs/checks/pull-check.md @@ -14,8 +14,6 @@ The pull check answers a different question than the [Data Storage check](./data A successful pull check requires all [assertions in the table below](#what-gets-asserted) to pass. Failure occurs if any step fails or the job exceeds its max allowed time. Operational timeouts exist to prevent jobs from running indefinitely, but they are not quality assertions. -> **Where results live:** Pull check results are exported to Prometheus and structured logs only. They are **not** persisted in Postgres or written to ClickHouse. - ## What Gets Asserted Each pull check asserts the following for every SP: