Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
f0855e2
chore: upgrade synapse to latest
silent-cipher Apr 29, 2026
a51bd33
feat: initial pull flow testing job
silent-cipher Apr 30, 2026
e08929e
fix: double pull check trigger
silent-cipher Apr 30, 2026
720e227
chore: remove pull check from dev-tools
silent-cipher May 2, 2026
a98e8e0
remove pull_checks table from db
silent-cipher May 2, 2026
a3c7aec
refactor: add piece validation and improve logging
silent-cipher May 2, 2026
2875737
fix: logging
silent-cipher May 2, 2026
07d21b1
fix: prometheus metric
silent-cipher May 4, 2026
9e57235
feat: add more prometheus metrics
silent-cipher May 4, 2026
f9b0e8d
add tests
silent-cipher May 4, 2026
f23911a
Merge branch 'main' into feat/pull-flow-testing
silent-cipher May 5, 2026
079bc60
add docs
silent-cipher May 5, 2026
5f4e729
refactor: remove commit from pull check
silent-cipher May 6, 2026
e0a4cb7
fix tests after commit removal
silent-cipher May 6, 2026
5b5e2b4
chore: remove unused mock
silent-cipher May 6, 2026
2c8e57b
fix: observe pull check completion latency once
silent-cipher May 6, 2026
ded9267
fix: metric help text
silent-cipher May 6, 2026
4267cae
fix: job calculations
silent-cipher May 6, 2026
ee8a0fd
Merge branch 'main' into feat/pull-flow-testing
silent-cipher May 7, 2026
200dbe9
chore: revert back to waitForPullStatus
silent-cipher May 7, 2026
f02e4b4
chore: fix lint
silent-cipher May 7, 2026
540f03d
feat: add deterministic random data generation and pull piece persist…
silent-cipher May 8, 2026
229bc7a
chore: cleanup
silent-cipher May 8, 2026
a0e13b8
chore: cleanup
silent-cipher May 8, 2026
4f6d2d0
docs: update to latest
silent-cipher May 8, 2026
c3d87a6
chore: remove MAX_BYTES limit
silent-cipher May 8, 2026
23d5ee7
chore: address pr comments
silent-cipher May 8, 2026
30f2379
chore: fix lint plus docs
silent-cipher May 8, 2026
b395c98
doc: remove ttl leftovers
silent-cipher May 9, 2026
b9873cd
refactor: worker doesn't expose /api/piece/:pieceCid
silent-cipher May 9, 2026
1a420f6
refactor: fire and forget pull piece deletion
silent-cipher May 9, 2026
da5e7b9
chore: address doc comments
silent-cipher May 11, 2026
2d462c0
refactor: move pull-piece config to dedicated section and add stream …
silent-cipher May 11, 2026
93d6179
feat: add rate limiting and stream tracking
silent-cipher May 11, 2026
863cd28
chore: fix docs
silent-cipher May 11, 2026
df54476
chore: fix faqs
silent-cipher May 11, 2026
445a0d6
refactor: rename metrics and arrange
silent-cipher May 11, 2026
c164add
feat: add pull_checks table to clickhouse
silent-cipher May 11, 2026
b1de0e5
doc: remove pull check results storage note
silent-cipher May 11, 2026
9b0513b
chore: update first_byte_ms comment
silent-cipher May 11, 2026
34a92a6
doc: clarify metric description
silent-cipher May 11, 2026
02f7ab9
refactor: remove custom throttler guard
silent-cipher May 11, 2026
65e8a3a
feat: add expiresAt field to pull piece entity
silent-cipher May 12, 2026
1ba5cd6
refactor: stream piece validation and add size checks
silent-cipher May 12, 2026
dfcefc8
Merge branch 'main' into feat/pull-flow-testing
SgtPooki May 12, 2026
577c2d0
Merge branch 'main' into feat/pull-flow-testing
silent-cipher May 13, 2026
629be23
chore: update waitForPullStatus -> waitForPullPieces
silent-cipher May 13, 2026
07309e8
Merge branch 'feat/pull-flow-testing' into feat/clickhouse-pull-check
silent-cipher May 13, 2026
f6b93c6
chore: address pr comments
silent-cipher May 13, 2026
c6f2ca8
Merge branch 'main' into feat/clickhouse-pull-check
silent-cipher May 20, 2026
7d54990
Merge branch 'main' into feat/clickhouse-pull-check
silent-cipher May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions apps/backend/src/clickhouse/clickhouse.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,28 @@ export function buildMigrations(database: string): string[] {
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,

`CREATE TABLE IF NOT EXISTS ${database}.pull_checks
(
timestamp DateTime64(3, 'UTC'), -- when the pull check terminated
probe_location LowCardinality(String), -- dealbot location
sp_address String, -- storage provider address
sp_id Nullable(UInt64), -- storage provider numeric id
sp_name Nullable(String), -- storage provider name

piece_cid Nullable(String), -- piece CID of the synthetic test piece; null if preparation failed
piece_size_bytes Nullable(UInt64), -- size of the synthetic piece in bytes; null if preparation failed

status LowCardinality(String), -- 'success' | 'failure.timedout' | 'failure.other'
provider_status LowCardinality(Nullable(String)), -- raw SP-reported terminal pull status (e.g. 'complete', 'failed'); null if the request was never acknowledged or if waiting for pull status errored or timed out

acknowledgement_latency_ms Nullable(Float64), -- time from pullPieces submission to SP acknowledgement (ms)
completion_latency_ms Nullable(Float64), -- time from pullPieces submission to terminal SP pull status (ms)
first_byte_ms Nullable(Float64), -- time from pullPieces submission to SP reading first byte of hosted piece (ms); null when check failed before first byte
Comment thread
silent-cipher marked this conversation as resolved.
throughput_bps Nullable(Float64) -- approx bytes/sec = piece_size_bytes / completion_latency_ms * 1000; null on failure
) ENGINE MergeTree()
PRIMARY KEY (probe_location, sp_address, timestamp)
PARTITION BY toStartOfMonth(timestamp)
TTL toDateTime(timestamp) + INTERVAL 1 YEAR`,
];
Comment thread
silent-cipher marked this conversation as resolved.
}
52 changes: 52 additions & 0 deletions apps/backend/src/pull-check/pull-check.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Readable } from "node:stream";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ClickhouseService } from "../clickhouse/clickhouse.service.js";
import type { IConfig } from "../config/app.config.js";
import { DataSourceService } from "../dataSource/dataSource.service.js";
import { HttpClientService } from "../http-client/http-client.service.js";
Expand Down Expand Up @@ -66,6 +67,7 @@ describe("PullCheckService", () => {
observeThroughputBps: ReturnType<typeof vi.fn>;
recordStatus: ReturnType<typeof vi.fn>;
};
let clickhouseServiceMock: { insert: ReturnType<typeof vi.fn>; probeLocation: string };
let configValues: Partial<IConfig>;

beforeEach(async () => {
Expand Down Expand Up @@ -96,6 +98,7 @@ describe("PullCheckService", () => {
observeThroughputBps: vi.fn(),
recordStatus: vi.fn(),
};
clickhouseServiceMock = { insert: vi.fn(), probeLocation: "test" };

configValues = {
app: { host: "localhost", port: 3000, apiPublicUrl: "https://dealbot.example" } as IConfig["app"],
Expand Down Expand Up @@ -125,6 +128,7 @@ describe("PullCheckService", () => {
{ provide: PullPieceRepository, useValue: registryMock },
{ provide: PullCheckCheckMetrics, useValue: metricsMock },
{ provide: HttpClientService, useValue: httpClientServiceMock },
{ provide: ClickhouseService, useValue: clickhouseServiceMock },
],
}).compile();

Expand Down Expand Up @@ -332,6 +336,18 @@ describe("PullCheckService", () => {

// Pieces expire via TTL rather than being deleted at job end.
expect(registryMock.forget).not.toHaveBeenCalled();
// ClickHouse row written with the check result.
expect(clickhouseServiceMock.insert).toHaveBeenCalledWith(
"pull_checks",
expect.objectContaining({
probe_location: "test",
sp_address: "0xsp",
piece_cid: "bafk-test-piece",
piece_size_bytes: 1024,
status: "success",
provider_status: "complete",
}),
);
});

it("does not observe firstByte when the SP never read from /api/piece (cached pull)", async () => {
Expand Down Expand Up @@ -360,6 +376,15 @@ describe("PullCheckService", () => {
expect(metricsMock.recordProviderStatus).toHaveBeenCalledWith(expect.any(Object), "failed");
expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.other");
expect(registryMock.forget).not.toHaveBeenCalled();
// ClickHouse row written with the failure outcome.
expect(clickhouseServiceMock.insert).toHaveBeenCalledWith(
"pull_checks",
expect.objectContaining({
sp_address: "0xsp",
status: "failure.other",
provider_status: "failed",
}),
);
});

it("classifies timeouts as failure.timedout", async () => {
Expand All @@ -368,6 +393,13 @@ describe("PullCheckService", () => {

await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow();
expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.timedout");
expect(clickhouseServiceMock.insert).toHaveBeenCalledWith(
"pull_checks",
expect.objectContaining({
sp_address: "0xsp",
status: "failure.timedout",
}),
);
});

it("re-throws and runs cleanup when the validation step fails", async () => {
Expand Down Expand Up @@ -405,6 +437,26 @@ describe("PullCheckService", () => {
await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow(/Synapse client unavailable/);
expect(metricsMock.recordStatus).toHaveBeenLastCalledWith(expect.any(Object), "failure.other");
});

it("writes a ClickHouse row with null sp fields when the provider is unknown", async () => {
walletSdkServiceMock.getProviderInfo.mockReturnValue(undefined);

await expect(service.runPullCheck("0xsp", undefined, logContext)).rejects.toThrow(/not found/);
// No metrics recorded (labels could not be built).
expect(metricsMock.recordStatus).not.toHaveBeenCalled();
// ClickHouse row still written: sp_address is always available,
// sp_id and sp_name are null since providerInfo was never resolved.
expect(clickhouseServiceMock.insert).toHaveBeenCalledWith(
"pull_checks",
expect.objectContaining({
sp_address: "0xsp",
sp_id: null,
sp_name: null,
piece_cid: null,
status: "failure.other",
}),
);
});
});

describe("deleteExpiredPullPieces", () => {
Expand Down
59 changes: 45 additions & 14 deletions apps/backend/src/pull-check/pull-check.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { pullPieces, waitForPullPieces } from "@filoz/synapse-core/sp";
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import type { Address } from "viem";
import { ClickhouseService } from "../clickhouse/clickhouse.service.js";
import { type ProviderJobContext, toStructuredError } from "../common/logging.js";
import type { IAppConfig, IConfig, IPullPieceConfig } from "../config/app.config.js";
import { DataSourceService } from "../dataSource/dataSource.service.js";
Expand All @@ -26,6 +27,7 @@ export class PullCheckService {
private readonly pullPieceRepository: PullPieceRepository,
private readonly pullCheckMetrics: PullCheckCheckMetrics,
private readonly httpClientService: HttpClientService,
private readonly clickhouseService: ClickhouseService,
) {}

/**
Expand Down Expand Up @@ -65,18 +67,26 @@ export class PullCheckService {
signal: AbortSignal | undefined,
logContext: ProviderJobContext,
): Promise<void> {
const providerInfo = this.validateProviderInfo(spAddress);
const labels = buildCheckMetricLabels({
checkType: "pullCheck",
providerId: providerInfo.id,
providerName: providerInfo.name,
providerIsApproved: providerInfo.isApproved,
});
let providerInfo: PDPProviderEx | null = null;
let labels: ReturnType<typeof buildCheckMetricLabels> | null = null;

let prepared: PullPiecePrepared | null = null;
let requestSubmittedAt: Date | null = null;
let requestLatencyMs: number | null = null;
let completionLatencyMs: number | null = null;
let firstByteMs: number | null = null;
let throughputBps: number | null = null;
let finalProviderStatus: string | null = null;
let checkStatus: string | null = null;

try {
providerInfo = this.validateProviderInfo(spAddress);
labels = buildCheckMetricLabels({
checkType: "pullCheck",
providerId: providerInfo.id,
providerName: providerInfo.name,
providerIsApproved: providerInfo.isApproved,
});
signal?.throwIfAborted();
prepared = await this.preparePullPiece(spAddress);
const pieceCidStr = prepared.registration.pieceCid;
Expand All @@ -101,7 +111,7 @@ export class PullCheckService {
await this.pullPieceRepository.markPullSubmitted(pieceCidStr, requestSubmittedAt);
const pullResponse = await pullPieces(synapseClient, pullPiecesOptions);
signal?.throwIfAborted();
const requestLatencyMs = Date.now() - requestSubmittedAt.getTime();
requestLatencyMs = Date.now() - requestSubmittedAt.getTime();
this.pullCheckMetrics.observeAcknowledgementLatencyMs(labels, requestLatencyMs);
this.logger.log({
...logContext,
Expand All @@ -120,10 +130,11 @@ export class PullCheckService {
pollInterval: pullPieceConfig.pullCheckPollIntervalSeconds * 1000,
});
signal?.throwIfAborted();
const completionLatencyMs = Date.now() - requestSubmittedAt.getTime();
completionLatencyMs = Date.now() - requestSubmittedAt.getTime();
this.pullCheckMetrics.observeCompletionLatencyMs(labels, completionLatencyMs);
// Record the SP-reported terminal pull status (one increment per check)
this.pullCheckMetrics.recordProviderStatus(labels, finalResponse.status);
finalProviderStatus = finalResponse.status;
this.pullCheckMetrics.recordProviderStatus(labels, finalProviderStatus);

if (finalResponse.status !== "complete") {
throw new Error(`Storage provider failed to pull piece: status=${finalResponse.status}`);
Expand All @@ -142,7 +153,7 @@ export class PullCheckService {
}

const firstByteEntry = await this.pullPieceRepository.resolve(pieceCidStr);
const firstByteMs =
firstByteMs =
firstByteEntry?.firstByteAt && firstByteEntry?.pullSubmittedAt
? firstByteEntry.firstByteAt.getTime() - firstByteEntry.pullSubmittedAt.getTime()
: null;
Expand All @@ -152,10 +163,11 @@ export class PullCheckService {
// Throughput approximated as pieceSize / completionLatency. This is an
// upper-bound on actual transfer time because completionLatency includes
// SP-side scheduling/queuing and our polling cadence.
const throughputBps = Math.round((prepared.registration.size * 1000) / Math.max(completionLatencyMs, 1));
throughputBps = Math.round((prepared.registration.size * 1000) / Math.max(completionLatencyMs ?? 1, 1));
this.pullCheckMetrics.observeThroughputBps(labels, throughputBps);

this.pullCheckMetrics.recordStatus(labels, "success");
checkStatus = "success";
this.pullCheckMetrics.recordStatus(labels, checkStatus);
this.logger.log({
...logContext,
event: "pull_check_completed",
Expand All @@ -168,8 +180,27 @@ export class PullCheckService {
pieceSizeBytes: prepared.registration.size,
});
} catch (error) {
this.pullCheckMetrics.recordStatus(labels, classifyFailureStatus(error));
checkStatus = classifyFailureStatus(error);
if (labels !== null) this.pullCheckMetrics.recordStatus(labels, checkStatus);
throw error;
} finally {
if (checkStatus !== null) {
this.clickhouseService.insert("pull_checks", {
timestamp: Date.now(),
probe_location: this.clickhouseService.probeLocation,
sp_address: spAddress,
sp_id: providerInfo?.id != null ? String(providerInfo.id) : null,
sp_name: providerInfo?.name ?? null,
piece_cid: prepared?.registration.pieceCid ?? null,
piece_size_bytes: prepared?.registration.size ?? null,
status: checkStatus,
provider_status: finalProviderStatus,
acknowledgement_latency_ms: requestLatencyMs,
completion_latency_ms: completionLatencyMs,
first_byte_ms: firstByteMs,
throughput_bps: throughputBps,
});
Comment thread
silent-cipher marked this conversation as resolved.
}
}
// Pieces are not eagerly deleted here; they remain active (200) until their
// TTL expires so that SPs polling after job end are not spuriously told 404.
Expand Down
2 changes: 0 additions & 2 deletions docs/checks/pull-check.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ The pull check answers a different question than the [Data Storage check](./data

A successful pull check requires all [assertions in the table below](#what-gets-asserted) to pass. Failure occurs if any step fails or the job exceeds its max allowed time. Operational timeouts exist to prevent jobs from running indefinitely, but they are not quality assertions.

Comment thread
silent-cipher marked this conversation as resolved.
> **Where results live:** Pull check results are exported to Prometheus and structured logs only. They are **not** persisted in Postgres or written to ClickHouse.

## What Gets Asserted

Each pull check asserts the following for every SP:
Expand Down