From 3e1ca3e975bdf0e204885a50298e8f560de85e34 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 13:43:41 -0400 Subject: [PATCH 1/7] feat(retrieval): branch piece-missing on PDP pieceLive instead of always cleanup Pre-flight piece-status probe used to mark any deal cleaned_up whenever the SP returned 404. That conflated two scenarios: pieces SPs legitimately purged (terminated datasets, hard-removed pieces) and pieces SPs should still serve (live datasets, scheduled-but-not-finalized removals). The latter was being silently swept under skipped.piece_missing instead of surfacing as a failed retrieval. New flow in RetrievalService.performAllRetrievals: 1. Chain pre-check via PDPVerifier.pieceLive(dataSetId, pieceId). - false: dataset terminated, piece never created, or piece hard-removed. Mark deal cleaned_up, emit skipped.piece_missing, no SP probe needed. - true: piece should be retrievable. Proceed. 2. SP HEAD probe on /pdp/piece/:pieceCid/status (HEAD with GET fallback on 405). - 404 after pieceLive=true: SP failure. Emit failure.other, persist a failed Retrieval row, keep deal in candidate pool for re-probing. - 200/other: proceed to full retrieval. Probe was GET, now HEAD; no body required. Extracts isDataSetLive + its FWSS/SP-HTTP probes into a new DatasetLivenessService (apps/backend/src/dataset-liveness/) so RetrievalService doesn't depend on DealService. DealService.isDataSetLive becomes a thin proxy. New isPieceLive method on the service reads PDPVerifier.pieceLive via viem. Updates docs/checks/events-and-metrics.md to document the new branching behavior of skipped.piece_missing vs failure.other on the Retrieval path. Probe-level tests for isDataSetLive in deal.service.spec.ts are skipped with a TODO; coverage to be re-added in a new dataset-liveness spec. Refs: https://github.com/FilOzone/dealbot/pull/556 --- .../dataset-liveness.module.ts | 10 ++ .../dataset-liveness.service.ts | 146 ++++++++++++++++++ apps/backend/src/deal/deal.module.ts | 2 + apps/backend/src/deal/deal.service.spec.ts | 22 ++- apps/backend/src/deal/deal.service.ts | 107 +------------ .../backend/src/retrieval/retrieval.module.ts | 2 + .../src/retrieval/retrieval.service.spec.ts | 76 ++++++++- .../src/retrieval/retrieval.service.ts | 113 ++++++++++++-- docs/checks/events-and-metrics.md | 2 +- 9 files changed, 352 insertions(+), 128 deletions(-) create mode 100644 apps/backend/src/dataset-liveness/dataset-liveness.module.ts create mode 100644 apps/backend/src/dataset-liveness/dataset-liveness.service.ts diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.module.ts b/apps/backend/src/dataset-liveness/dataset-liveness.module.ts new file mode 100644 index 00000000..d7389a3c --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.module.ts @@ -0,0 +1,10 @@ +import { Module } from "@nestjs/common"; +import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; +import { DatasetLivenessService } from "./dataset-liveness.service.js"; + +@Module({ + imports: [WalletSdkModule], + providers: [DatasetLivenessService], + exports: [DatasetLivenessService], +}) +export class DatasetLivenessModule {} diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts new file mode 100644 index 00000000..766c37e9 --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts @@ -0,0 +1,146 @@ +import { asChain } from "@filoz/synapse-core/chains"; +import { Injectable, Logger } from "@nestjs/common"; +import type { Account, Chain, Client, Transport } from "viem"; +import { readContract } from "viem/actions"; +import { awaitWithAbort } from "../common/abort-utils.js"; +import { toStructuredError } from "../common/logging.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; + +type SynapseViemClient = Client; + +const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; + +/** + * Composite PDP-liveness probe extracted from DealService so other services + * (e.g. RetrievalService) can reuse it without depending on DealService. + * + * Two independent probes: + * + * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via + * multicall and additionally verifies the listener is this WarmStorage + * contract, so it covers chain-side liveness fully. + * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's + * `unrecoverable_proving_failure_epoch` state, which precedes chain + * propagation and is the only signal observable when the SP refuses + * addPieces but chain still reports the set as live. + * + * A positive-terminated signal from either probe wins: if any settled result + * is `false`, returns `false` even when the other probe threw a transient + * error. Otherwise rethrows the first rejection so a probe outage is not + * silently misclassified as live. + */ +@Injectable() +export class DatasetLivenessService { + private readonly logger = new Logger(DatasetLivenessService.name); + + constructor(private readonly walletSdkService: WalletSdkService) {} + + async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const settled = await Promise.allSettled([ + this.probeFwssDataSetLive(dataSetId, signal), + this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), + ]); + if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { + return false; + } + const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); + if (rejection) throw rejection.reason; + return true; + } + + /** + * On-chain check that a specific piece is still expected to be retrievable. + * + * Wraps `PDPVerifier.pieceLive(setId, pieceId)`, which returns: + * `dataSetLive(setId) && pieceId < nextPieceId[setId] && pieceLeafCounts[setId][pieceId] > 0` + * + * So `false` covers three legitimate reasons the SP can answer 404: + * 1. Dataset terminated. + * 2. Piece ID never created. + * 3. Piece hard-removed (`removePieces` finalized; leaf count zeroed). + * + * Pieces that are scheduled for removal but not yet finalized still return + * `true` — the SP remains on the hook for challenges until the next + * proving period clears the scheduledRemovals queue, so it must still + * serve GET requests. + * + * Source: `FilOzone/pdp` PDPVerifier.sol `pieceLive`. + */ + async isPieceLive(dataSetId: bigint, pieceId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const client = this.walletSdkService.getSynapseClient() as SynapseViemClient | null; + if (!client) { + throw new Error("Synapse client not available for pieceLive read"); + } + const chain = asChain(client.chain); + const result = await awaitWithAbort( + readContract(client, { + abi: chain.contracts.pdp.abi, + address: chain.contracts.pdp.address, + functionName: "pieceLive", + args: [dataSetId, pieceId], + }), + signal, + ); + return Boolean(result); + } + + protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const { warmStorageService } = this.walletSdkService.getWalletServices(); + try { + await awaitWithAbort(warmStorageService.validateDataSet({ dataSetId }), signal); + return true; + } catch (error) { + if (signal?.aborted) throw error; + const message = error instanceof Error ? error.message : String(error); + if (/does not exist or is not live/i.test(message)) { + return false; + } + throw error; + } + } + + protected async probeSpHttpDataSetLive( + providerAddress: string, + dataSetId: bigint, + signal?: AbortSignal, + ): Promise { + signal?.throwIfAborted(); + const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); + if (!providerInfo) { + throw new Error(`Provider ${providerAddress} not found in registry`); + } + const serviceURL = providerInfo.pdp?.serviceURL; + if (!serviceURL) { + throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); + } + const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); + const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); + const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; + try { + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "{}", + signal: probeSignal, + }); + if (res.status !== 409) return true; + const body = await res.text(); + return !/unrecoverable proving failure/i.test(body); + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + event: "dataset_sp_liveness_probe_failed", + message: "SP HTTP liveness probe failed; treating dataset as live", + providerAddress, + providerId: providerInfo.id, + dataSetId: dataSetId.toString(), + serviceURL, + error: toStructuredError(error), + }); + return true; + } + } +} diff --git a/apps/backend/src/deal/deal.module.ts b/apps/backend/src/deal/deal.module.ts index 93217cc5..f8213b8e 100644 --- a/apps/backend/src/deal/deal.module.ts +++ b/apps/backend/src/deal/deal.module.ts @@ -4,6 +4,7 @@ import { DatabaseModule } from "../database/database.module.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; +import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { DataSourceModule } from "../dataSource/dataSource.module.js"; import { DealAddonsModule } from "../deal-addons/deal-addons.module.js"; import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js"; @@ -18,6 +19,7 @@ import { DealService } from "./deal.service.js"; WalletSdkModule, DealAddonsModule, RetrievalAddonsModule, + DatasetLivenessModule, ], providers: [DealService], exports: [DealService], diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 960eee09..e6ed1457 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -11,6 +11,7 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, IpniStatus } from "../database/types.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; import { DealAddonsService } from "../deal-addons/deal-addons.service.js"; import { DealPreprocessingResult } from "../deal-addons/types.js"; @@ -122,6 +123,10 @@ describe("DealService", () => { getDataSet: vi.fn().mockResolvedValue({ pdpEndEpoch: 0n }), terminateDataSet: vi.fn().mockResolvedValue("0xhash"), }; + // Default: dataset is live. Tests that exercise the terminated path override per-call. + const mockDatasetLivenessService = { + isDataSetLive: vi.fn().mockResolvedValue(true), + }; const mockWalletSdkService = { getFWSSAddress: vi.fn().mockReturnValue("0xFWSS"), getTestingProvidersCount: vi.fn(), @@ -180,6 +185,7 @@ describe("DealService", () => { { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); @@ -1063,6 +1069,7 @@ describe("DealService", () => { { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); @@ -1244,16 +1251,14 @@ describe("DealService", () => { expect(result).toEqual({ status: "live", dataSetId: 7n }); }); - it("returns terminated when validateDataSet throws", async () => { + it("returns terminated when isDataSetLive returns false", async () => { const synapseMock = { storage: { createContext: vi.fn().mockResolvedValue({ dataSetId: 9n }), }, }; vi.spyOn(service as any, "createSynapseInstance").mockImplementation(() => synapseMock as unknown as Synapse); - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 9 does not exist or is not live"), - ); + mockDatasetLivenessService.isDataSetLive.mockResolvedValueOnce(false); const result = await service.getDataSetProvisioningStatus("0xprovider", { dealbotDS: "1" }); expect(result).toEqual({ status: "terminated", dataSetId: 9n }); @@ -1440,7 +1445,10 @@ describe("DealService", () => { }); }); - describe("isDataSetLive", () => { + // Probe-level tests moved to DatasetLivenessService. DealService.isDataSetLive + // is a thin proxy after the refactor. Re-add coverage in dataset-liveness.service.spec.ts. + // TODO(retrieval-active-fail-PR): port these to the new spec. + describe.skip("isDataSetLive", () => { const providerInfo: PDPProviderEx = { id: 101n, serviceProvider: "0xprovider", @@ -1564,9 +1572,7 @@ describe("DealService", () => { mockStorageProviderRepository.findOne.mockResolvedValue({ providerId: 1, isApproved: true }); vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 9 does not exist or is not live"), - ); + mockDatasetLivenessService.isDataSetLive.mockResolvedValueOnce(false); await expect(service.createDeal(synapseMock, providerInfo, dealInput, uploadPayload)).rejects.toBeInstanceOf( DealJobTerminatedDataSetError, diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 2edf4026..916c0678 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -25,6 +25,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, IpniStatus, ServiceType } from "../database/types.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DealAddonsService } from "../deal-addons/deal-addons.service.js"; import type { DealPreprocessingResult } from "../deal-addons/types.js"; import { buildCheckMetricLabels, classifyFailureStatus } from "../metrics-prometheus/check-metric-labels.js"; @@ -43,8 +44,6 @@ type UploadPayload = { rootCid: CID; }; -const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; - type UploadResultSummary = { pieceCid: string; pieceId?: number; @@ -71,6 +70,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { private readonly retrievalMetrics: RetrievalCheckMetrics, private readonly dataSetCreationMetrics: DataSetCreationCheckMetrics, private readonly clickhouseService: ClickhouseService, + private readonly datasetLivenessService: DatasetLivenessService, ) { this.blockchainConfig = this.configService.get("blockchain"); } @@ -714,107 +714,12 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Composite PDP-liveness check. Runs two independent probes: - * - * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via - * multicall and additionally verifies the listener is this WarmStorage - * contract, so it covers chain-side liveness fully. - * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's - * `unrecoverable_proving_failure_epoch` state, which precedes chain - * propagation and is the only signal observable when the SP refuses - * addPieces but chain still reports the set as live. - * - * A positive-terminated signal from either probe wins: if any settled result - * is `false`, returns `false` even when the other probe threw a transient - * error. Otherwise rethrows the first rejection so a probe outage is not - * silently misclassified as live. The SP HTTP probe never throws on - * transient errors (returns `true` on non-409 responses, including network - * errors and auth failures), since HTTP 409 with Curio's terminated body is - * the only signal that endpoint emits. + * Thin proxy to `DatasetLivenessService.isDataSetLive`. See that class for + * probe rationale. Kept on DealService to preserve existing call sites + * (`getDataSetProvisioningStatus`, `createDeal` post-context guard). */ async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { - signal?.throwIfAborted(); - const settled = await Promise.allSettled([ - this.probeFwssDataSetLive(dataSetId, signal), - this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), - ]); - if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { - return false; - } - const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); - if (rejection) throw rejection.reason; - return true; - } - - protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { - signal?.throwIfAborted(); - const { warmStorageService } = this.walletSdkService.getWalletServices(); - try { - await awaitWithAbort(warmStorageService.validateDataSet({ dataSetId }), signal); - return true; - } catch (error) { - if (signal?.aborted) throw error; - const message = error instanceof Error ? error.message : String(error); - if (/does not exist or is not live/i.test(message)) { - return false; - } - throw error; - } - } - - /** - * Probes the SP's Curio addPieces endpoint with an empty body. Curio's - * handler checks `unrecoverable_proving_failure_epoch` before body - * validation and returns HTTP 409 with "Data set has been terminated due to - * unrecoverable proving failure" when the dataset is marked terminated. - * - * Returns `false` only on `409` paired with that body text. The body - * substring is a guard against blast radius: if a future Curio reuses `409` - * for a non-terminal conflict, the probe stays conservative rather than - * triggering destructive repair. Any other response (including a `409` with - * a different body, `400` for empty pieces, `404`, `5xx`, and network - * errors) is treated as live. - */ - protected async probeSpHttpDataSetLive( - providerAddress: string, - dataSetId: bigint, - signal?: AbortSignal, - ): Promise { - signal?.throwIfAborted(); - const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); - if (!providerInfo) { - throw new Error(`Provider ${providerAddress} not found in registry`); - } - const serviceURL = providerInfo.pdp?.serviceURL; - if (!serviceURL) { - throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); - } - const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); - const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); - const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; - try { - const res = await fetch(url, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "{}", - signal: probeSignal, - }); - if (res.status !== 409) return true; - const body = await res.text(); - return !/unrecoverable proving failure/i.test(body); - } catch (error) { - if (signal?.aborted) throw error; - this.logger.warn({ - event: "dataset_sp_liveness_probe_failed", - message: "SP HTTP liveness probe failed; treating dataset as live", - providerAddress, - providerId: providerInfo.id, - dataSetId: dataSetId.toString(), - serviceURL, - error: toStructuredError(error), - }); - return true; - } + return this.datasetLivenessService.isDataSetLive(providerAddress, dataSetId, signal); } /** diff --git a/apps/backend/src/retrieval/retrieval.module.ts b/apps/backend/src/retrieval/retrieval.module.ts index 9b631a7e..8641cada 100644 --- a/apps/backend/src/retrieval/retrieval.module.ts +++ b/apps/backend/src/retrieval/retrieval.module.ts @@ -4,6 +4,7 @@ import { DatabaseModule } from "../database/database.module.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; +import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { HttpClientModule } from "../http-client/http-client.module.js"; import { IpniModule } from "../ipni/ipni.module.js"; import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js"; @@ -18,6 +19,7 @@ import { RetrievalService } from "./retrieval.service.js"; WalletSdkModule, IpniModule, RetrievalAddonsModule, + DatasetLivenessModule, ], providers: [RetrievalService], exports: [RetrievalService], diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index a5f286cf..623e0161 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -7,6 +7,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { RetrievalStatus } from "../database/types.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { IpniVerificationService } from "../ipni/ipni-verification.service.js"; import { DiscoverabilityCheckMetrics, RetrievalCheckMetrics } from "../metrics-prometheus/check-metrics.service.js"; import { RetrievalAddonsService } from "../retrieval-addons/retrieval-addons.service.js"; @@ -96,6 +97,13 @@ describe("RetrievalService timeouts", () => { { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); @@ -428,6 +436,13 @@ describe("RetrievalService parallel IPNI + transport", () => { { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); return module.get(RetrievalService) as unknown as RetrievalServicePrivate; @@ -612,6 +627,13 @@ describe("RetrievalService DB/provider drift", () => { { provide: IpniVerificationService, useValue: {} }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); return module.get(RetrievalService); @@ -668,6 +690,14 @@ describe("RetrievalService SP piece status pre-flight", () => { recordHttpResponseCode: vi.fn(), recordResultMetrics: vi.fn(), }; + const mockRetrievalRepositoryLocal = { + create: vi.fn((row: Partial) => row as Retrieval), + save: vi.fn(async (row: Retrieval) => row), + }; + const mockDatasetLivenessService = { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }; afterEach(() => { vi.unstubAllGlobals(); @@ -675,6 +705,10 @@ describe("RetrievalService SP piece status pre-flight", () => { mockDealRepository.update.mockClear(); mockRetrievalAddonsService.testAllRetrievalMethods.mockClear(); mockRetrievalMetrics.recordStatus.mockClear(); + mockRetrievalRepositoryLocal.create.mockClear(); + mockRetrievalRepositoryLocal.save.mockClear(); + mockDatasetLivenessService.isDataSetLive.mockReset().mockResolvedValue(true); + mockDatasetLivenessService.isPieceLive.mockReset().mockResolvedValue(true); }); const buildDeal = (overrides: Partial = {}): Deal => @@ -683,6 +717,8 @@ describe("RetrievalService SP piece status pre-flight", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, ...overrides, }) as Deal; @@ -692,19 +728,20 @@ describe("RetrievalService SP piece status pre-flight", () => { RetrievalService, { provide: RetrievalAddonsService, useValue: mockRetrievalAddonsService }, { provide: getRepositoryToken(Deal), useValue: mockDealRepository }, - { provide: getRepositoryToken(Retrieval), useValue: { create: vi.fn(), save: vi.fn() } }, + { provide: getRepositoryToken(Retrieval), useValue: mockRetrievalRepositoryLocal }, { provide: getRepositoryToken(StorageProvider), useValue: mockSpRepository }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DiscoverabilityCheckMetrics, useValue: { recordStatus: vi.fn(), observeIpniVerifyMs: vi.fn() } }, { provide: IpniVerificationService, useValue: { verify: vi.fn() } }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); return module.get(RetrievalService) as unknown as RetrievalServicePrivate; }; - it("marks deal cleaned_up and skips retrieval when SP returns 404 for piece status", async () => { + it("marks deal cleaned_up and skips retrieval when PDP pieceLive=false (no SP probe)", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ address: "0xsp", @@ -713,7 +750,8 @@ describe("RetrievalService SP piece status pre-flight", () => { name: "Test SP", serviceUrl: "https://sp.example.com", }); - const fetchMock = vi.fn().mockResolvedValue({ status: 404, ok: false } as Response); + mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(false); + const fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); const result = await service.performAllRetrievals(buildDeal()); @@ -725,9 +763,39 @@ describe("RetrievalService SP piece status pre-flight", () => { ); expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "skipped.piece_missing"); + // No SP probe when chain says piece is gone + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("records a failed retrieval (failure.other) when SP returns 404 but pieceLive=true", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(true); + const fetchMock = vi.fn().mockResolvedValue({ status: 404, ok: false } as Response); + vi.stubGlobal("fetch", fetchMock); + + const result = await service.performAllRetrievals(buildDeal()); + + expect(result).toHaveLength(1); + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + expect(mockRetrievalRepositoryLocal.save).toHaveBeenCalledWith( + expect.objectContaining({ + status: RetrievalStatus.FAILED, + responseCode: 404, + errorMessage: expect.stringContaining("pieceLive=true"), + }), + ); expect(fetchMock).toHaveBeenCalledWith( "https://sp.example.com/pdp/piece/bafy-piece/status", - expect.objectContaining({ method: "GET" }), + expect.objectContaining({ method: "HEAD" }), ); }); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index c148ccec..a01d7c88 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -11,6 +11,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, RetrievalStatus, ServiceType } from "../database/types.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { IpniVerificationService } from "../ipni/ipni-verification.service.js"; import { buildCheckMetricLabels, @@ -51,6 +52,7 @@ export class RetrievalService { private readonly ipniVerificationService: IpniVerificationService, private readonly configService: ConfigService, private readonly clickhouseService: ClickhouseService, + private readonly datasetLivenessService: DatasetLivenessService, ) {} async performRandomRetrievalForProvider( @@ -129,13 +131,26 @@ export class RetrievalService { return []; } - // Pre-flight: if the SP itself reports it no longer has this piece, mark the deal - // cleaned_up so it stops polluting future retrieval candidates and skip the check. - // Saves the 30s IPNI verify timeout + downstream block-fetch 404 noise per stale deal. - if (provider.serviceUrl && deal.pieceCid) { - const probe = await this.probeSpPieceStatus(provider.serviceUrl, deal.pieceCid, signal); + // Pre-check pipeline before any retrieval work: + // + // 1. Chain `pieceLive(dataSetId, pieceId)`: source of truth for whether + // the SP is still expected to retain this piece. If false (dataset + // terminated, piece never created, or piece hard-removed), mark the + // deal cleaned_up + skip. No SP probe needed in this case. + // 2. SP HTTP HEAD `/pdp/piece/:pieceCid/status`: cheap health-check that + // the SP can actually serve. 404 here when chain says the piece + // should be live = real SP-side failure. Recorded as a failed + // retrieval row (deal stays in the candidate pool so the scheduler + // re-probes; persistent failures become observable on dashboards). + if (deal.dataSetId != null && deal.pieceId != null) { + const pieceLive = await this.checkPieceLive( + BigInt(deal.dataSetId.toString()), + BigInt(deal.pieceId), + signal, + retrievalLogContext, + ); signal?.throwIfAborted(); - if (probe.result === "missing") { + if (!pieceLive) { const updateResult = await this.dealRepository.update( { id: deal.id, cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() }, @@ -144,13 +159,47 @@ export class RetrievalService { this.logger.warn({ ...retrievalLogContext, event: "retrieval_skipped_piece_missing", - message: "SP reports piece missing; marked deal cleaned_up and skipped retrieval", + message: "PDP pieceLive=false; marked deal cleaned_up and skipped retrieval", + dataSetId: deal.dataSetId.toString(), + pieceId: deal.pieceId, + affected: updateResult.affected ?? 0, + }); + return []; + } + } + + if (provider.serviceUrl && deal.pieceCid) { + const probe = await this.probeSpPieceStatus(provider.serviceUrl, deal.pieceCid, signal); + signal?.throwIfAborted(); + if (probe.result === "missing") { + // Chain pre-check above already confirmed the piece SHOULD be retrievable + // (or we lacked dataSetId/pieceId to check). SP 404 here is an SP-side + // failure to honor its storage commitment. + this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); + const now = new Date(); + const startedAt = new Date(now.getTime() - probe.durationMs); + const failed = this.retrievalRepository.create({ + deal, + serviceType: ServiceType.IPFS_PIN, + retrievalEndpoint: probe.url, + status: RetrievalStatus.FAILED, + startedAt, + completedAt: now, + latencyMs: probe.durationMs, + responseCode: probe.statusCode ?? null, + errorMessage: "SP reports piece missing but PDP pieceLive=true", + retryCount: 0, + } as Partial); + const saved = await this.retrievalRepository.save(failed); + this.logger.warn({ + ...retrievalLogContext, + event: "retrieval_failed_piece_missing_live", + message: "SP reports piece missing while chain reports pieceLive=true; recorded failed retrieval", statusUrl: probe.url, statusCode: probe.statusCode, probeDurationMs: probe.durationMs, - affected: updateResult.affected ?? 0, }); - return []; + return [saved]; } } @@ -418,14 +467,22 @@ export class RetrievalService { const signal = outerSignal ? AbortSignal.any([outerSignal, timeoutSignal]) : timeoutSignal; const start = Date.now(); try { - const res = await fetch(url, { - method: "GET", + // HEAD avoids transferring the status JSON body. Status code is all we need. + // If the SP responds 405 (Method Not Allowed), fall back to GET. + let res = await fetch(url, { + method: "HEAD", signal, headers: { "User-Agent": "dealbot/probe" }, }); - // Drain/cancel the body so undici returns the socket to the pool instead of - // keeping it pinned to an unread response. Body content is irrelevant here. - await res.body?.cancel().catch(() => undefined); + if (res.status === 405) { + res = await fetch(url, { + method: "GET", + signal, + headers: { "User-Agent": "dealbot/probe" }, + }); + // Drain the body since GET responses include one. Body content is irrelevant. + await res.body?.cancel().catch(() => undefined); + } const durationMs = Date.now() - start; if (res.status === 404) return { result: "missing", url, statusCode: 404, durationMs }; if (res.ok) return { result: "exists", url, statusCode: res.status, durationMs }; @@ -584,4 +641,32 @@ export class RetrievalService { return { ok: false, failureStatus }; } } + + /** + * Defensive wrapper around `DatasetLivenessService.isPieceLive` used by the + * retrieval pre-check. On RPC failure, return `true` (treat as live) so a + * transient chain outage does NOT cascade into bulk cleanups. The downstream + * SP probe + retrieval fetch will surface the real outcome instead. + */ + private async checkPieceLive( + dataSetId: bigint, + pieceId: bigint, + signal: AbortSignal | undefined, + logContext: RetrievalLogContext, + ): Promise { + try { + return await this.datasetLivenessService.isPieceLive(dataSetId, pieceId, signal); + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + ...logContext, + event: "retrieval_piece_liveness_probe_failed", + message: "PDP pieceLive probe failed; treating piece as live to avoid spurious cleanup", + dataSetId: dataSetId.toString(), + pieceId: pieceId.toString(), + error: toStructuredError(error), + }); + return true; + } + } } diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index d2d1a033..3083e560 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -140,7 +140,7 @@ sequenceDiagram | `dataStorageStatus` | Data Storage | When the Data Storage check completes (all four sub-statuses done) | `success`, `failure.timedout`, `failure.other` from [Deal Status Progression](./data-storage.md#deal-status-progression). | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `discoverabilityStatus` | Data Storage, Retrieval | [`ipniVerificationComplete`](#ipniVerificationComplete) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped` when IPNI verification is bypassed because `rootCID`/`blockCIDs` are absent from deal metadata or `rootCID` cannot be parsed as a valid CID. | | | `ipfsRetrievalHttpResponseCode` | Data Storage, Retrieval | [`ipfsRetrievalLastByteReceived`](#ipfsRetrievalLastByteReceived) | `200`, `500`, `2xxSuccess`, `4xxClientError`, `5xxServerError`, `otherHttpStatusCodes`, `failure` | [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts) | -| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped.piece_missing` is emitted on the Retrieval path only when the SP's `/pdp/piece/:pieceCid/status` returns 404 during the pre-flight probe; the deal is marked `cleaned_up=true` and removed from the retrieval candidate pool. | | +| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). On the Retrieval path, the pre-flight probe (`/pdp/piece/:pieceCid/status` 404) branches on parent dataset liveness: when the dataset is terminated/deleted on chain, `skipped.piece_missing` is emitted and the deal is marked `cleaned_up=true`; when the dataset is still live, `failure.other` is emitted and a failed retrieval row is recorded (deal stays in the candidate pool for re-probing). | | | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Emitted on each [Data Retention Check](./data-retention.md) poll when a provider's confirmed proving-period totals advance (strictly positive deltas). Unit: **challenges** (period delta × `CHALLENGES_PER_PROVING_PERIOD = 5`). | `success` (challenges in successfully-proven periods), `failure` (challenges in faulted periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pdp_provider_estimated_overdue_periods` | Data Retention | Emitted on every [Data Retention Check](./data-retention.md) poll for every successfully processed provider. | Gauge value in proving periods (non-negative integer) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | From 1c3be0f27ea4e4e143ea8a3d83879fe70ded2b4a Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 14:20:21 -0400 Subject: [PATCH 2/7] test(retrieval): port liveness probe tests + add isPieceLive coverage - New DatasetLivenessService spec ports the FWSS/SP-HTTP probe matrix previously in deal.service.spec.ts and adds isPieceLive coverage: happy/false/no-client/RPC error/abort. - Retrieval pre-check now treats missing dataSetId/pieceId as an error (emits failure.other and bails) instead of falling through to the SP probe. Backfill of legacy null-ID deals is a prerequisite before this branch deploys. The error path is loud enough to spot any stragglers. - Add retrieval spec coverage for the missing-IDs branch and the HEAD with GET-on-405 fallback. - Delete the stale describe.skip("isDataSetLive") block from deal.service.spec.ts; coverage now lives in the new spec. Refs: #561 follow-up commit addressing peer-review consensus. --- .../dataset-liveness.service.spec.ts | 157 ++++++++++++++++++ apps/backend/src/deal/deal.module.ts | 2 +- apps/backend/src/deal/deal.service.spec.ts | 89 +--------- .../src/retrieval/retrieval.service.spec.ts | 49 ++++++ .../src/retrieval/retrieval.service.ts | 67 +++++--- 5 files changed, 248 insertions(+), 116 deletions(-) create mode 100644 apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts new file mode 100644 index 00000000..7a246d55 --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts @@ -0,0 +1,157 @@ +import { Test, TestingModule } from "@nestjs/testing"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; +import { DatasetLivenessService } from "./dataset-liveness.service.js"; + +vi.mock("@filoz/synapse-core/pdp-verifier", () => ({})); + +vi.mock("@filoz/synapse-core/chains", () => ({ + asChain: () => ({ + contracts: { + pdp: { + address: "0xpdp", + abi: [], + }, + }, + }), +})); + +const readContractMock = vi.fn(); +vi.mock("viem/actions", () => ({ + readContract: (...args: unknown[]) => readContractMock(...args), +})); + +describe("DatasetLivenessService", () => { + let service: DatasetLivenessService; + let fetchMock: ReturnType; + + const mockWarmStorageService = { + validateDataSet: vi.fn().mockResolvedValue(undefined), + }; + const mockWalletSdkService = { + getProviderInfo: vi.fn().mockReturnValue({ + id: 101n, + pdp: { serviceURL: "https://sp.example" }, + }), + getWalletServices: vi.fn().mockReturnValue({ + warmStorageService: mockWarmStorageService, + }), + getSynapseClient: vi.fn().mockReturnValue({ chain: { id: 314 } }), + }; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [DatasetLivenessService, { provide: WalletSdkService, useValue: mockWalletSdkService }], + }).compile(); + service = module.get(DatasetLivenessService); + fetchMock = vi.fn().mockResolvedValue(new Response("ok", { status: 200 })); + vi.stubGlobal("fetch", fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + mockWarmStorageService.validateDataSet.mockReset().mockResolvedValue(undefined); + readContractMock.mockReset(); + }); + + describe("isDataSetLive", () => { + it("returns true when both probes report live", async () => { + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("returns false when FWSS validateDataSet reports not live", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce( + new Error("Data set 1 does not exist or is not live"), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("treats SP HTTP 409 with a different body as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP non-409 responses as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP network errors as live", async () => { + fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); + }); + + it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("posts an empty JSON body to the SP addPieces endpoint", async () => { + await service.isDataSetLive("0xprovider", 42n); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; + expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); + expect(init.method).toBe("POST"); + expect(init.body).toBe("{}"); + }); + + it("aborts when outer signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + await expect(service.isDataSetLive("0xprovider", 1n, ac.signal)).rejects.toThrow(); + }); + }); + + describe("isPieceLive", () => { + it("returns true when PDPVerifier.pieceLive returns true", async () => { + readContractMock.mockResolvedValueOnce(true); + await expect(service.isPieceLive(1n, 42n)).resolves.toBe(true); + expect(readContractMock).toHaveBeenCalledWith( + expect.objectContaining({ chain: { id: 314 } }), + expect.objectContaining({ + address: "0xpdp", + functionName: "pieceLive", + args: [1n, 42n], + }), + ); + }); + + it("returns false when PDPVerifier.pieceLive returns false", async () => { + readContractMock.mockResolvedValueOnce(false); + await expect(service.isPieceLive(1n, 42n)).resolves.toBe(false); + }); + + it("throws when synapse client is not available", async () => { + mockWalletSdkService.getSynapseClient.mockReturnValueOnce(null); + await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("Synapse client not available for pieceLive read"); + }); + + it("propagates RPC errors", async () => { + readContractMock.mockRejectedValueOnce(new Error("RPC down")); + await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("RPC down"); + }); + + it("aborts when outer signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + readContractMock.mockResolvedValueOnce(true); + await expect(service.isPieceLive(1n, 42n, ac.signal)).rejects.toThrow(); + }); + }); +}); diff --git a/apps/backend/src/deal/deal.module.ts b/apps/backend/src/deal/deal.module.ts index f8213b8e..cd78ac07 100644 --- a/apps/backend/src/deal/deal.module.ts +++ b/apps/backend/src/deal/deal.module.ts @@ -4,8 +4,8 @@ import { DatabaseModule } from "../database/database.module.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; -import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { DataSourceModule } from "../dataSource/dataSource.module.js"; +import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { DealAddonsModule } from "../deal-addons/deal-addons.module.js"; import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js"; import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index e6ed1457..b57ce941 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -11,8 +11,8 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, IpniStatus } from "../database/types.js"; -import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DealAddonsService } from "../deal-addons/deal-addons.service.js"; import { DealPreprocessingResult } from "../deal-addons/types.js"; import { @@ -1445,93 +1445,6 @@ describe("DealService", () => { }); }); - // Probe-level tests moved to DatasetLivenessService. DealService.isDataSetLive - // is a thin proxy after the refactor. Re-add coverage in dataset-liveness.service.spec.ts. - // TODO(retrieval-active-fail-PR): port these to the new spec. - describe.skip("isDataSetLive", () => { - const providerInfo: PDPProviderEx = { - id: 101n, - serviceProvider: "0xprovider", - payee: "0x100", - name: "Test Provider", - description: "Test Provider", - isActive: true, - isApproved: true, - pdp: { - serviceURL: "https://sp.example", - minPieceSizeInBytes: 0n, - maxPieceSizeInBytes: 100n, - storagePricePerTibPerDay: 1n, - minProvingPeriodInEpochs: 1n, - location: "location", - paymentTokenAddress: "0x100", - ipniPiece: true, - ipniIpfs: true, - }, - }; - - beforeEach(() => { - vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); - const synapseMock = { client: {} } as unknown as Synapse; - vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); - }); - - it("returns true when both probes report live", async () => { - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("returns false when FWSS validateDataSet reports not live", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 1 does not exist or is not live"), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { - fetchMock.mockResolvedValueOnce( - new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("treats SP HTTP 409 with a different body as live", async () => { - fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("treats SP HTTP non-409 responses as live", async () => { - fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("treats SP HTTP network errors as live", async () => { - fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); - }); - - it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - fetchMock.mockResolvedValueOnce( - new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("posts an empty JSON body to the SP addPieces endpoint", async () => { - await service.isDataSetLive("0xprovider", 42n); - expect(fetchMock).toHaveBeenCalledTimes(1); - const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; - expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); - expect(init.method).toBe("POST"); - expect(init.body).toBe("{}"); - }); - }); - describe("createDeal isLive guard", () => { it("throws DealJobTerminatedDataSetError when data set is PDP-terminated; no metrics or save", async () => { const providerInfo: PDPProviderEx = { diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index 623e0161..8b28e351 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -81,6 +81,8 @@ describe("RetrievalService timeouts", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, ...overrides, }) as Deal; @@ -379,6 +381,8 @@ describe("RetrievalService parallel IPNI + transport", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, metadata: { ipfs_pin: { rootCID: "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", @@ -741,6 +745,51 @@ describe("RetrievalService SP piece status pre-flight", () => { return module.get(RetrievalService) as unknown as RetrievalServicePrivate; }; + it("emits failure.other and bails when deal has null dataSetId or pieceId (backfill required)", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + + const result = await service.performAllRetrievals(buildDeal({ pieceId: null as unknown as number })); + + expect(result).toEqual([]); + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockDatasetLivenessService.isPieceLive).not.toHaveBeenCalled(); + expect(fetchMock).not.toHaveBeenCalled(); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + }); + + it("falls back to HEAD then GET when SP returns 405 Method Not Allowed", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(true); + const fetchMock = vi + .fn() + .mockResolvedValueOnce({ status: 405, ok: false } as Response) + .mockResolvedValueOnce({ status: 200, ok: true, body: null } as unknown as Response); + vi.stubGlobal("fetch", fetchMock); + + await service.performAllRetrievals(buildDeal()).catch(() => undefined); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0]?.[1]).toMatchObject({ method: "HEAD" }); + expect(fetchMock.mock.calls[1]?.[1]).toMatchObject({ method: "GET" }); + }); + it("marks deal cleaned_up and skips retrieval when PDP pieceLive=false (no SP probe)", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index a01d7c88..1de99f72 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -131,7 +131,9 @@ export class RetrievalService { return []; } - // Pre-check pipeline before any retrieval work: + // Pre-check pipeline before any retrieval work. Assumes `deal.dataSetId` + // and `deal.pieceId` are populated (backfilled for legacy rows; new deals + // populate them in the upload event handler at deal.service.ts). // // 1. Chain `pieceLive(dataSetId, pieceId)`: source of truth for whether // the SP is still expected to retain this piece. If false (dataset @@ -142,39 +144,50 @@ export class RetrievalService { // should be live = real SP-side failure. Recorded as a failed // retrieval row (deal stays in the candidate pool so the scheduler // re-probes; persistent failures become observable on dashboards). - if (deal.dataSetId != null && deal.pieceId != null) { - const pieceLive = await this.checkPieceLive( - BigInt(deal.dataSetId.toString()), - BigInt(deal.pieceId), - signal, - retrievalLogContext, + if (deal.dataSetId == null || deal.pieceId == null) { + // Unexpected post-backfill. Emit a loud signal and bail out so the row + // is fixed before it pollutes downstream metrics. + this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); + this.logger.error({ + ...retrievalLogContext, + event: "retrieval_missing_chain_ids", + message: "Deal is missing dataSetId or pieceId; cannot run chain pre-check. Backfill required.", + dataSetId: deal.dataSetId?.toString() ?? null, + pieceId: deal.pieceId ?? null, + }); + return []; + } + + const pieceLive = await this.checkPieceLive( + BigInt(deal.dataSetId.toString()), + BigInt(deal.pieceId), + signal, + retrievalLogContext, + ); + signal?.throwIfAborted(); + if (!pieceLive) { + const updateResult = await this.dealRepository.update( + { id: deal.id, cleanedUp: false }, + { cleanedUp: true, cleanedUpAt: new Date() }, ); - signal?.throwIfAborted(); - if (!pieceLive) { - const updateResult = await this.dealRepository.update( - { id: deal.id, cleanedUp: false }, - { cleanedUp: true, cleanedUpAt: new Date() }, - ); - this.retrievalMetrics.recordStatus(providerLabels, "skipped.piece_missing"); - this.logger.warn({ - ...retrievalLogContext, - event: "retrieval_skipped_piece_missing", - message: "PDP pieceLive=false; marked deal cleaned_up and skipped retrieval", - dataSetId: deal.dataSetId.toString(), - pieceId: deal.pieceId, - affected: updateResult.affected ?? 0, - }); - return []; - } + this.retrievalMetrics.recordStatus(providerLabels, "skipped.piece_missing"); + this.logger.warn({ + ...retrievalLogContext, + event: "retrieval_skipped_piece_missing", + message: "PDP pieceLive=false; marked deal cleaned_up and skipped retrieval", + dataSetId: deal.dataSetId.toString(), + pieceId: deal.pieceId, + affected: updateResult.affected ?? 0, + }); + return []; } if (provider.serviceUrl && deal.pieceCid) { const probe = await this.probeSpPieceStatus(provider.serviceUrl, deal.pieceCid, signal); signal?.throwIfAborted(); if (probe.result === "missing") { - // Chain pre-check above already confirmed the piece SHOULD be retrievable - // (or we lacked dataSetId/pieceId to check). SP 404 here is an SP-side - // failure to honor its storage commitment. + // Chain pre-check above confirmed the piece SHOULD be retrievable. + // SP 404 here is an SP-side failure to honor its storage commitment. this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); const now = new Date(); const startedAt = new Date(now.getTime() - probe.durationMs); From a297c08fdfb60eb934ea6e43111a108dc735d70a Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 16:19:21 -0400 Subject: [PATCH 3/7] docs(retrieval): drop change-oriented comments --- .../src/dataset-liveness/dataset-liveness.service.ts | 12 ++++-------- apps/backend/src/retrieval/retrieval.service.ts | 9 ++++----- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts index 766c37e9..f6ce4fef 100644 --- a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts @@ -11,10 +11,7 @@ type SynapseViemClient = Client; const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; /** - * Composite PDP-liveness probe extracted from DealService so other services - * (e.g. RetrievalService) can reuse it without depending on DealService. - * - * Two independent probes: + * Composite PDP-liveness probe. Two independent probes: * * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via * multicall and additionally verifies the listener is this WarmStorage @@ -24,10 +21,9 @@ const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; * propagation and is the only signal observable when the SP refuses * addPieces but chain still reports the set as live. * - * A positive-terminated signal from either probe wins: if any settled result - * is `false`, returns `false` even when the other probe threw a transient - * error. Otherwise rethrows the first rejection so a probe outage is not - * silently misclassified as live. + * If any settled result is `false`, returns `false` even when the other + * probe threw a transient error. Otherwise rethrows the first rejection so + * a probe outage is not silently misclassified as live. */ @Injectable() export class DatasetLivenessService { diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 1de99f72..fec4c08c 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -131,9 +131,9 @@ export class RetrievalService { return []; } - // Pre-check pipeline before any retrieval work. Assumes `deal.dataSetId` - // and `deal.pieceId` are populated (backfilled for legacy rows; new deals - // populate them in the upload event handler at deal.service.ts). + // Pre-check pipeline before any retrieval work. Requires `deal.dataSetId` + // and `deal.pieceId` to be populated (DealService writes them in the + // upload event handler). // // 1. Chain `pieceLive(dataSetId, pieceId)`: source of truth for whether // the SP is still expected to retain this piece. If false (dataset @@ -145,8 +145,7 @@ export class RetrievalService { // retrieval row (deal stays in the candidate pool so the scheduler // re-probes; persistent failures become observable on dashboards). if (deal.dataSetId == null || deal.pieceId == null) { - // Unexpected post-backfill. Emit a loud signal and bail out so the row - // is fixed before it pollutes downstream metrics. + // Bail loudly so the row is fixed before it pollutes downstream metrics. this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); this.logger.error({ ...retrievalLogContext, From 520ea746cdeb6b0c95e8c72f8073d563ed19fc06 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 16:28:54 -0400 Subject: [PATCH 4/7] refactor(retrieval): drop HEAD fallback, GET only --- .../src/retrieval/retrieval.service.spec.ts | 25 +------------------ .../src/retrieval/retrieval.service.ts | 21 ++++++---------- 2 files changed, 8 insertions(+), 38 deletions(-) diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index 8b28e351..12019d16 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -767,29 +767,6 @@ describe("RetrievalService SP piece status pre-flight", () => { expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); }); - it("falls back to HEAD then GET when SP returns 405 Method Not Allowed", async () => { - const service = await createService(); - mockSpRepository.findOne.mockResolvedValue({ - address: "0xsp", - providerId: 5, - isApproved: true, - name: "Test SP", - serviceUrl: "https://sp.example.com", - }); - mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(true); - const fetchMock = vi - .fn() - .mockResolvedValueOnce({ status: 405, ok: false } as Response) - .mockResolvedValueOnce({ status: 200, ok: true, body: null } as unknown as Response); - vi.stubGlobal("fetch", fetchMock); - - await service.performAllRetrievals(buildDeal()).catch(() => undefined); - - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(fetchMock.mock.calls[0]?.[1]).toMatchObject({ method: "HEAD" }); - expect(fetchMock.mock.calls[1]?.[1]).toMatchObject({ method: "GET" }); - }); - it("marks deal cleaned_up and skips retrieval when PDP pieceLive=false (no SP probe)", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ @@ -844,7 +821,7 @@ describe("RetrievalService SP piece status pre-flight", () => { ); expect(fetchMock).toHaveBeenCalledWith( "https://sp.example.com/pdp/piece/bafy-piece/status", - expect.objectContaining({ method: "HEAD" }), + expect.objectContaining({ method: "GET" }), ); }); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index fec4c08c..35c52b4a 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -139,7 +139,7 @@ export class RetrievalService { // the SP is still expected to retain this piece. If false (dataset // terminated, piece never created, or piece hard-removed), mark the // deal cleaned_up + skip. No SP probe needed in this case. - // 2. SP HTTP HEAD `/pdp/piece/:pieceCid/status`: cheap health-check that + // 2. SP HTTP GET `/pdp/piece/:pieceCid/status`: cheap health-check that // the SP can actually serve. 404 here when chain says the piece // should be live = real SP-side failure. Recorded as a failed // retrieval row (deal stays in the candidate pool so the scheduler @@ -479,22 +479,15 @@ export class RetrievalService { const signal = outerSignal ? AbortSignal.any([outerSignal, timeoutSignal]) : timeoutSignal; const start = Date.now(); try { - // HEAD avoids transferring the status JSON body. Status code is all we need. - // If the SP responds 405 (Method Not Allowed), fall back to GET. - let res = await fetch(url, { - method: "HEAD", + // Curio chi router does not register HEAD for /pdp/piece/{cid}/status (returns 405) + // and ignores Range headers. Body is a small JSON status payload (<500B), so just + // GET and drop the body without reading it. + const res = await fetch(url, { + method: "GET", signal, headers: { "User-Agent": "dealbot/probe" }, }); - if (res.status === 405) { - res = await fetch(url, { - method: "GET", - signal, - headers: { "User-Agent": "dealbot/probe" }, - }); - // Drain the body since GET responses include one. Body content is irrelevant. - await res.body?.cancel().catch(() => undefined); - } + await res.body?.cancel().catch(() => undefined); const durationMs = Date.now() - start; if (res.status === 404) return { result: "missing", url, statusCode: 404, durationMs }; if (res.ok) return { result: "exists", url, statusCode: res.status, durationMs }; From 7720c4576b488a1d55750180c4be00cbc90cc4fa Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 16:31:08 -0400 Subject: [PATCH 5/7] test(retrieval): trim backfill note from test name --- apps/backend/src/retrieval/retrieval.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index 12019d16..d2e76d2f 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -745,7 +745,7 @@ describe("RetrievalService SP piece status pre-flight", () => { return module.get(RetrievalService) as unknown as RetrievalServicePrivate; }; - it("emits failure.other and bails when deal has null dataSetId or pieceId (backfill required)", async () => { + it("emits failure.other and bails when deal has null dataSetId or pieceId", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ address: "0xsp", From 64ac8d03ee286c553836f59f4f82a5fe9dfadf6c Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Thu, 21 May 2026 16:39:44 -0400 Subject: [PATCH 6/7] test+docs: cover pieceLive RPC throw, fix metric desc --- .../src/retrieval/retrieval.service.spec.ts | 19 +++++++++++++++++++ docs/checks/events-and-metrics.md | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index d2e76d2f..abe9980e 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -842,6 +842,25 @@ describe("RetrievalService SP piece status pre-flight", () => { expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled(); }); + it("treats piece as live and proceeds when isPieceLive RPC throws (no cleanup)", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockRejectedValueOnce(new Error("rpc down")); + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ status: 200, ok: true } as Response)); + + await service.performAllRetrievals(buildDeal()).catch(() => undefined); + + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).not.toHaveBeenCalledWith(expect.any(Object), "skipped.piece_missing"); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled(); + }); + it("proceeds with retrieval when SP probe fails with a network error (unknown)", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index 3083e560..33957c5e 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -140,7 +140,7 @@ sequenceDiagram | `dataStorageStatus` | Data Storage | When the Data Storage check completes (all four sub-statuses done) | `success`, `failure.timedout`, `failure.other` from [Deal Status Progression](./data-storage.md#deal-status-progression). | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `discoverabilityStatus` | Data Storage, Retrieval | [`ipniVerificationComplete`](#ipniVerificationComplete) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped` when IPNI verification is bypassed because `rootCID`/`blockCIDs` are absent from deal metadata or `rootCID` cannot be parsed as a valid CID. | | | `ipfsRetrievalHttpResponseCode` | Data Storage, Retrieval | [`ipfsRetrievalLastByteReceived`](#ipfsRetrievalLastByteReceived) | `200`, `500`, `2xxSuccess`, `4xxClientError`, `5xxServerError`, `otherHttpStatusCodes`, `failure` | [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts) | -| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). On the Retrieval path, the pre-flight probe (`/pdp/piece/:pieceCid/status` 404) branches on parent dataset liveness: when the dataset is terminated/deleted on chain, `skipped.piece_missing` is emitted and the deal is marked `cleaned_up=true`; when the dataset is still live, `failure.other` is emitted and a failed retrieval row is recorded (deal stays in the candidate pool for re-probing). | | +| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). On the Retrieval path, the pre-flight branches on the on-chain `PDPVerifier.pieceLive(dataSetId, pieceId)` result. When `pieceLive=false` (dataset terminated, piece never created, or piece hard-removed), `skipped.piece_missing` is emitted and the deal is marked `cleaned_up=true`; no SP probe runs. When `pieceLive=true` and the SP returns 404 on `/pdp/piece/:pieceCid/status`, `failure.other` is emitted and a failed retrieval row is recorded (deal stays in the candidate pool for re-probing). | | | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Emitted on each [Data Retention Check](./data-retention.md) poll when a provider's confirmed proving-period totals advance (strictly positive deltas). Unit: **challenges** (period delta × `CHALLENGES_PER_PROVING_PERIOD = 5`). | `success` (challenges in successfully-proven periods), `failure` (challenges in faulted periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pdp_provider_estimated_overdue_periods` | Data Retention | Emitted on every [Data Retention Check](./data-retention.md) poll for every successfully processed provider. | Gauge value in proving periods (non-negative integer) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | From af8005160a1413d54c9a79205f00b5c78060b6b2 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Fri, 22 May 2026 08:53:55 -0400 Subject: [PATCH 7/7] refactor: hoist SynapseViemClient, drop bigint cast --- .../src/dataset-liveness/dataset-liveness.service.ts | 5 +---- apps/backend/src/pull-check/pull-check.service.ts | 6 ++---- apps/backend/src/retrieval/retrieval.service.ts | 7 +------ apps/backend/src/wallet-sdk/wallet-sdk.service.ts | 7 +++++-- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts index f6ce4fef..56d7a389 100644 --- a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts @@ -1,13 +1,10 @@ import { asChain } from "@filoz/synapse-core/chains"; import { Injectable, Logger } from "@nestjs/common"; -import type { Account, Chain, Client, Transport } from "viem"; import { readContract } from "viem/actions"; import { awaitWithAbort } from "../common/abort-utils.js"; import { toStructuredError } from "../common/logging.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; -type SynapseViemClient = Client; - const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; /** @@ -65,7 +62,7 @@ export class DatasetLivenessService { */ async isPieceLive(dataSetId: bigint, pieceId: bigint, signal?: AbortSignal): Promise { signal?.throwIfAborted(); - const client = this.walletSdkService.getSynapseClient() as SynapseViemClient | null; + const client = this.walletSdkService.getSynapseClient(); if (!client) { throw new Error("Synapse client not available for pieceLive read"); } diff --git a/apps/backend/src/pull-check/pull-check.service.ts b/apps/backend/src/pull-check/pull-check.service.ts index 39209f1e..ff76a264 100644 --- a/apps/backend/src/pull-check/pull-check.service.ts +++ b/apps/backend/src/pull-check/pull-check.service.ts @@ -3,20 +3,18 @@ import { calculateFromIterable, parse as parsePieceCid } from "@filoz/synapse-co import { pullPieces, waitForPullPieces } from "@filoz/synapse-core/sp"; import { Injectable, Logger } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; -import type { Account, Address, Chain, Client, Transport } from "viem"; +import type { Address } from "viem"; import { type ProviderJobContext, toStructuredError } from "../common/logging.js"; import type { IAppConfig, IConfig, IPullPieceConfig } from "../config/app.config.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; import { HttpClientService } from "../http-client/http-client.service.js"; import { buildCheckMetricLabels, classifyFailureStatus } from "../metrics-prometheus/check-metric-labels.js"; import { PullCheckCheckMetrics } from "../metrics-prometheus/check-metrics.service.js"; -import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; +import { type SynapseViemClient, WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { PDPProviderEx } from "../wallet-sdk/wallet-sdk.types.js"; import type { PullPiecePrepared, PullPieceStreamResult } from "./pull-check.types.js"; import { PullPieceRepository } from "./pull-piece.repository.js"; -type SynapseViemClient = Client; - @Injectable() export class PullCheckService { private readonly logger = new Logger(PullCheckService.name); diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 35c52b4a..d69e1bd2 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -157,12 +157,7 @@ export class RetrievalService { return []; } - const pieceLive = await this.checkPieceLive( - BigInt(deal.dataSetId.toString()), - BigInt(deal.pieceId), - signal, - retrievalLogContext, - ); + const pieceLive = await this.checkPieceLive(deal.dataSetId, BigInt(deal.pieceId), signal, retrievalLogContext); signal?.throwIfAborted(); if (!pieceLive) { const updateResult = await this.dealRepository.update( diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 9373150b..2ec9401e 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -7,6 +7,7 @@ import { Injectable, Logger, type OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import type { Repository } from "typeorm"; +import type { Account, Chain, Client, Transport } from "viem"; import { type Hex } from "viem"; import { DEV_TAG } from "../common/constants.js"; import { toStructuredError } from "../common/logging.js"; @@ -15,6 +16,8 @@ import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import type { PDPProviderEx, WalletServices } from "./wallet-sdk.types.js"; +export type SynapseViemClient = Client; + @Injectable() export class WalletSdkService implements OnModuleInit { private readonly logger = new Logger(WalletSdkService.name); @@ -316,8 +319,8 @@ export class WalletSdkService implements OnModuleInit { * Returns `null` when chain integration is disabled or the client has not been * initialized yet. */ - getSynapseClient(): unknown { - return this._synapseClient ?? null; + getSynapseClient(): SynapseViemClient | null { + return (this._synapseClient as SynapseViemClient | null) ?? null; } /**