diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.module.ts b/apps/backend/src/dataset-liveness/dataset-liveness.module.ts new file mode 100644 index 00000000..d7389a3c --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.module.ts @@ -0,0 +1,10 @@ +import { Module } from "@nestjs/common"; +import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; +import { DatasetLivenessService } from "./dataset-liveness.service.js"; + +@Module({ + imports: [WalletSdkModule], + providers: [DatasetLivenessService], + exports: [DatasetLivenessService], +}) +export class DatasetLivenessModule {} diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts new file mode 100644 index 00000000..7a246d55 --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts @@ -0,0 +1,157 @@ +import { Test, TestingModule } from "@nestjs/testing"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; +import { DatasetLivenessService } from "./dataset-liveness.service.js"; + +vi.mock("@filoz/synapse-core/pdp-verifier", () => ({})); + +vi.mock("@filoz/synapse-core/chains", () => ({ + asChain: () => ({ + contracts: { + pdp: { + address: "0xpdp", + abi: [], + }, + }, + }), +})); + +const readContractMock = vi.fn(); +vi.mock("viem/actions", () => ({ + readContract: (...args: unknown[]) => readContractMock(...args), +})); + +describe("DatasetLivenessService", () => { + let service: DatasetLivenessService; + let fetchMock: ReturnType; + + const mockWarmStorageService = { + validateDataSet: vi.fn().mockResolvedValue(undefined), + }; + const mockWalletSdkService = { + getProviderInfo: vi.fn().mockReturnValue({ + id: 101n, + pdp: { serviceURL: "https://sp.example" }, + }), + getWalletServices: vi.fn().mockReturnValue({ + warmStorageService: mockWarmStorageService, + }), + getSynapseClient: vi.fn().mockReturnValue({ chain: { id: 314 } }), + }; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [DatasetLivenessService, { provide: WalletSdkService, useValue: mockWalletSdkService }], + }).compile(); + service = module.get(DatasetLivenessService); + fetchMock = vi.fn().mockResolvedValue(new Response("ok", { status: 200 })); + vi.stubGlobal("fetch", fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + mockWarmStorageService.validateDataSet.mockReset().mockResolvedValue(undefined); + readContractMock.mockReset(); + }); + + describe("isDataSetLive", () => { + it("returns true when both probes report live", async () => { + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("returns false when FWSS validateDataSet reports not live", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce( + new Error("Data set 1 does not exist or is not live"), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("treats SP HTTP 409 with a different body as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP non-409 responses as live", async () => { + fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("treats SP HTTP network errors as live", async () => { + fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); + }); + + it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); + }); + + it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { + mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); + fetchMock.mockResolvedValueOnce( + new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), + ); + await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); + }); + + it("posts an empty JSON body to the SP addPieces endpoint", async () => { + await service.isDataSetLive("0xprovider", 42n); + expect(fetchMock).toHaveBeenCalledTimes(1); + const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; + expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); + expect(init.method).toBe("POST"); + expect(init.body).toBe("{}"); + }); + + it("aborts when outer signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + await expect(service.isDataSetLive("0xprovider", 1n, ac.signal)).rejects.toThrow(); + }); + }); + + describe("isPieceLive", () => { + it("returns true when PDPVerifier.pieceLive returns true", async () => { + readContractMock.mockResolvedValueOnce(true); + await expect(service.isPieceLive(1n, 42n)).resolves.toBe(true); + expect(readContractMock).toHaveBeenCalledWith( + expect.objectContaining({ chain: { id: 314 } }), + expect.objectContaining({ + address: "0xpdp", + functionName: "pieceLive", + args: [1n, 42n], + }), + ); + }); + + it("returns false when PDPVerifier.pieceLive returns false", async () => { + readContractMock.mockResolvedValueOnce(false); + await expect(service.isPieceLive(1n, 42n)).resolves.toBe(false); + }); + + it("throws when synapse client is not available", async () => { + mockWalletSdkService.getSynapseClient.mockReturnValueOnce(null); + await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("Synapse client not available for pieceLive read"); + }); + + it("propagates RPC errors", async () => { + readContractMock.mockRejectedValueOnce(new Error("RPC down")); + await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("RPC down"); + }); + + it("aborts when outer signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + readContractMock.mockResolvedValueOnce(true); + await expect(service.isPieceLive(1n, 42n, ac.signal)).rejects.toThrow(); + }); + }); +}); diff --git a/apps/backend/src/dataset-liveness/dataset-liveness.service.ts b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts new file mode 100644 index 00000000..56d7a389 --- /dev/null +++ b/apps/backend/src/dataset-liveness/dataset-liveness.service.ts @@ -0,0 +1,139 @@ +import { asChain } from "@filoz/synapse-core/chains"; +import { Injectable, Logger } from "@nestjs/common"; +import { readContract } from "viem/actions"; +import { awaitWithAbort } from "../common/abort-utils.js"; +import { toStructuredError } from "../common/logging.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; + +const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; + +/** + * Composite PDP-liveness probe. Two independent probes: + * + * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via + * multicall and additionally verifies the listener is this WarmStorage + * contract, so it covers chain-side liveness fully. + * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's + * `unrecoverable_proving_failure_epoch` state, which precedes chain + * propagation and is the only signal observable when the SP refuses + * addPieces but chain still reports the set as live. + * + * If any settled result is `false`, returns `false` even when the other + * probe threw a transient error. Otherwise rethrows the first rejection so + * a probe outage is not silently misclassified as live. + */ +@Injectable() +export class DatasetLivenessService { + private readonly logger = new Logger(DatasetLivenessService.name); + + constructor(private readonly walletSdkService: WalletSdkService) {} + + async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const settled = await Promise.allSettled([ + this.probeFwssDataSetLive(dataSetId, signal), + this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), + ]); + if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { + return false; + } + const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); + if (rejection) throw rejection.reason; + return true; + } + + /** + * On-chain check that a specific piece is still expected to be retrievable. + * + * Wraps `PDPVerifier.pieceLive(setId, pieceId)`, which returns: + * `dataSetLive(setId) && pieceId < nextPieceId[setId] && pieceLeafCounts[setId][pieceId] > 0` + * + * So `false` covers three legitimate reasons the SP can answer 404: + * 1. Dataset terminated. + * 2. Piece ID never created. + * 3. Piece hard-removed (`removePieces` finalized; leaf count zeroed). + * + * Pieces that are scheduled for removal but not yet finalized still return + * `true` — the SP remains on the hook for challenges until the next + * proving period clears the scheduledRemovals queue, so it must still + * serve GET requests. + * + * Source: `FilOzone/pdp` PDPVerifier.sol `pieceLive`. + */ + async isPieceLive(dataSetId: bigint, pieceId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const client = this.walletSdkService.getSynapseClient(); + if (!client) { + throw new Error("Synapse client not available for pieceLive read"); + } + const chain = asChain(client.chain); + const result = await awaitWithAbort( + readContract(client, { + abi: chain.contracts.pdp.abi, + address: chain.contracts.pdp.address, + functionName: "pieceLive", + args: [dataSetId, pieceId], + }), + signal, + ); + return Boolean(result); + } + + protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + const { warmStorageService } = this.walletSdkService.getWalletServices(); + try { + await awaitWithAbort(warmStorageService.validateDataSet({ dataSetId }), signal); + return true; + } catch (error) { + if (signal?.aborted) throw error; + const message = error instanceof Error ? error.message : String(error); + if (/does not exist or is not live/i.test(message)) { + return false; + } + throw error; + } + } + + protected async probeSpHttpDataSetLive( + providerAddress: string, + dataSetId: bigint, + signal?: AbortSignal, + ): Promise { + signal?.throwIfAborted(); + const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); + if (!providerInfo) { + throw new Error(`Provider ${providerAddress} not found in registry`); + } + const serviceURL = providerInfo.pdp?.serviceURL; + if (!serviceURL) { + throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); + } + const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); + const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); + const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; + try { + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: "{}", + signal: probeSignal, + }); + if (res.status !== 409) return true; + const body = await res.text(); + return !/unrecoverable proving failure/i.test(body); + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + event: "dataset_sp_liveness_probe_failed", + message: "SP HTTP liveness probe failed; treating dataset as live", + providerAddress, + providerId: providerInfo.id, + dataSetId: dataSetId.toString(), + serviceURL, + error: toStructuredError(error), + }); + return true; + } + } +} diff --git a/apps/backend/src/deal/deal.module.ts b/apps/backend/src/deal/deal.module.ts index 93217cc5..cd78ac07 100644 --- a/apps/backend/src/deal/deal.module.ts +++ b/apps/backend/src/deal/deal.module.ts @@ -5,6 +5,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DataSourceModule } from "../dataSource/dataSource.module.js"; +import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { DealAddonsModule } from "../deal-addons/deal-addons.module.js"; import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js"; import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; @@ -18,6 +19,7 @@ import { DealService } from "./deal.service.js"; WalletSdkModule, DealAddonsModule, RetrievalAddonsModule, + DatasetLivenessModule, ], providers: [DealService], exports: [DealService], diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 960eee09..b57ce941 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -12,6 +12,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, IpniStatus } from "../database/types.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DealAddonsService } from "../deal-addons/deal-addons.service.js"; import { DealPreprocessingResult } from "../deal-addons/types.js"; import { @@ -122,6 +123,10 @@ describe("DealService", () => { getDataSet: vi.fn().mockResolvedValue({ pdpEndEpoch: 0n }), terminateDataSet: vi.fn().mockResolvedValue("0xhash"), }; + // Default: dataset is live. Tests that exercise the terminated path override per-call. + const mockDatasetLivenessService = { + isDataSetLive: vi.fn().mockResolvedValue(true), + }; const mockWalletSdkService = { getFWSSAddress: vi.fn().mockReturnValue("0xFWSS"), getTestingProvidersCount: vi.fn(), @@ -180,6 +185,7 @@ describe("DealService", () => { { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); @@ -1063,6 +1069,7 @@ describe("DealService", () => { { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DataSetCreationCheckMetrics, useValue: mockDataSetCreationMetrics }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); @@ -1244,16 +1251,14 @@ describe("DealService", () => { expect(result).toEqual({ status: "live", dataSetId: 7n }); }); - it("returns terminated when validateDataSet throws", async () => { + it("returns terminated when isDataSetLive returns false", async () => { const synapseMock = { storage: { createContext: vi.fn().mockResolvedValue({ dataSetId: 9n }), }, }; vi.spyOn(service as any, "createSynapseInstance").mockImplementation(() => synapseMock as unknown as Synapse); - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 9 does not exist or is not live"), - ); + mockDatasetLivenessService.isDataSetLive.mockResolvedValueOnce(false); const result = await service.getDataSetProvisioningStatus("0xprovider", { dealbotDS: "1" }); expect(result).toEqual({ status: "terminated", dataSetId: 9n }); @@ -1440,90 +1445,6 @@ describe("DealService", () => { }); }); - describe("isDataSetLive", () => { - const providerInfo: PDPProviderEx = { - id: 101n, - serviceProvider: "0xprovider", - payee: "0x100", - name: "Test Provider", - description: "Test Provider", - isActive: true, - isApproved: true, - pdp: { - serviceURL: "https://sp.example", - minPieceSizeInBytes: 0n, - maxPieceSizeInBytes: 100n, - storagePricePerTibPerDay: 1n, - minProvingPeriodInEpochs: 1n, - location: "location", - paymentTokenAddress: "0x100", - ipniPiece: true, - ipniIpfs: true, - }, - }; - - beforeEach(() => { - vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); - const synapseMock = { client: {} } as unknown as Synapse; - vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); - }); - - it("returns true when both probes report live", async () => { - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("returns false when FWSS validateDataSet reports not live", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 1 does not exist or is not live"), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("returns false when SP HTTP probe returns 409 with the terminated body", async () => { - fetchMock.mockResolvedValueOnce( - new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("treats SP HTTP 409 with a different body as live", async () => { - fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 })); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("treats SP HTTP non-409 responses as live", async () => { - fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 })); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("treats SP HTTP network errors as live", async () => { - fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED")); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true); - }); - - it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED"); - }); - - it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => { - mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545")); - fetchMock.mockResolvedValueOnce( - new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }), - ); - await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false); - }); - - it("posts an empty JSON body to the SP addPieces endpoint", async () => { - await service.isDataSetLive("0xprovider", 42n); - expect(fetchMock).toHaveBeenCalledTimes(1); - const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit]; - expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces"); - expect(init.method).toBe("POST"); - expect(init.body).toBe("{}"); - }); - }); - describe("createDeal isLive guard", () => { it("throws DealJobTerminatedDataSetError when data set is PDP-terminated; no metrics or save", async () => { const providerInfo: PDPProviderEx = { @@ -1564,9 +1485,7 @@ describe("DealService", () => { mockStorageProviderRepository.findOne.mockResolvedValue({ providerId: 1, isApproved: true }); vi.spyOn(mockWalletSdkService, "getProviderInfo").mockReturnValue(providerInfo); vi.spyOn(service as any, "createSynapseInstance").mockResolvedValue(synapseMock); - mockWarmStorageService.validateDataSet.mockRejectedValueOnce( - new Error("Data set 9 does not exist or is not live"), - ); + mockDatasetLivenessService.isDataSetLive.mockResolvedValueOnce(false); await expect(service.createDeal(synapseMock, providerInfo, dealInput, uploadPayload)).rejects.toBeInstanceOf( DealJobTerminatedDataSetError, diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 2edf4026..916c0678 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -25,6 +25,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, IpniStatus, ServiceType } from "../database/types.js"; import { DataSourceService } from "../dataSource/dataSource.service.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { DealAddonsService } from "../deal-addons/deal-addons.service.js"; import type { DealPreprocessingResult } from "../deal-addons/types.js"; import { buildCheckMetricLabels, classifyFailureStatus } from "../metrics-prometheus/check-metric-labels.js"; @@ -43,8 +44,6 @@ type UploadPayload = { rootCid: CID; }; -const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000; - type UploadResultSummary = { pieceCid: string; pieceId?: number; @@ -71,6 +70,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { private readonly retrievalMetrics: RetrievalCheckMetrics, private readonly dataSetCreationMetrics: DataSetCreationCheckMetrics, private readonly clickhouseService: ClickhouseService, + private readonly datasetLivenessService: DatasetLivenessService, ) { this.blockchainConfig = this.configService.get("blockchain"); } @@ -714,107 +714,12 @@ export class DealService implements OnModuleInit, OnModuleDestroy { } /** - * Composite PDP-liveness check. Runs two independent probes: - * - * - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via - * multicall and additionally verifies the listener is this WarmStorage - * contract, so it covers chain-side liveness fully. - * - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's - * `unrecoverable_proving_failure_epoch` state, which precedes chain - * propagation and is the only signal observable when the SP refuses - * addPieces but chain still reports the set as live. - * - * A positive-terminated signal from either probe wins: if any settled result - * is `false`, returns `false` even when the other probe threw a transient - * error. Otherwise rethrows the first rejection so a probe outage is not - * silently misclassified as live. The SP HTTP probe never throws on - * transient errors (returns `true` on non-409 responses, including network - * errors and auth failures), since HTTP 409 with Curio's terminated body is - * the only signal that endpoint emits. + * Thin proxy to `DatasetLivenessService.isDataSetLive`. See that class for + * probe rationale. Kept on DealService to preserve existing call sites + * (`getDataSetProvisioningStatus`, `createDeal` post-context guard). */ async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise { - signal?.throwIfAborted(); - const settled = await Promise.allSettled([ - this.probeFwssDataSetLive(dataSetId, signal), - this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal), - ]); - if (settled.some((r) => r.status === "fulfilled" && r.value === false)) { - return false; - } - const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected"); - if (rejection) throw rejection.reason; - return true; - } - - protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise { - signal?.throwIfAborted(); - const { warmStorageService } = this.walletSdkService.getWalletServices(); - try { - await awaitWithAbort(warmStorageService.validateDataSet({ dataSetId }), signal); - return true; - } catch (error) { - if (signal?.aborted) throw error; - const message = error instanceof Error ? error.message : String(error); - if (/does not exist or is not live/i.test(message)) { - return false; - } - throw error; - } - } - - /** - * Probes the SP's Curio addPieces endpoint with an empty body. Curio's - * handler checks `unrecoverable_proving_failure_epoch` before body - * validation and returns HTTP 409 with "Data set has been terminated due to - * unrecoverable proving failure" when the dataset is marked terminated. - * - * Returns `false` only on `409` paired with that body text. The body - * substring is a guard against blast radius: if a future Curio reuses `409` - * for a non-terminal conflict, the probe stays conservative rather than - * triggering destructive repair. Any other response (including a `409` with - * a different body, `400` for empty pieces, `404`, `5xx`, and network - * errors) is treated as live. - */ - protected async probeSpHttpDataSetLive( - providerAddress: string, - dataSetId: bigint, - signal?: AbortSignal, - ): Promise { - signal?.throwIfAborted(); - const providerInfo = this.walletSdkService.getProviderInfo(providerAddress); - if (!providerInfo) { - throw new Error(`Provider ${providerAddress} not found in registry`); - } - const serviceURL = providerInfo.pdp?.serviceURL; - if (!serviceURL) { - throw new Error(`Provider ${providerAddress} has no PDP serviceURL`); - } - const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL); - const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS); - const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; - try { - const res = await fetch(url, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: "{}", - signal: probeSignal, - }); - if (res.status !== 409) return true; - const body = await res.text(); - return !/unrecoverable proving failure/i.test(body); - } catch (error) { - if (signal?.aborted) throw error; - this.logger.warn({ - event: "dataset_sp_liveness_probe_failed", - message: "SP HTTP liveness probe failed; treating dataset as live", - providerAddress, - providerId: providerInfo.id, - dataSetId: dataSetId.toString(), - serviceURL, - error: toStructuredError(error), - }); - return true; - } + return this.datasetLivenessService.isDataSetLive(providerAddress, dataSetId, signal); } /** diff --git a/apps/backend/src/retrieval/retrieval.module.ts b/apps/backend/src/retrieval/retrieval.module.ts index 9b631a7e..8641cada 100644 --- a/apps/backend/src/retrieval/retrieval.module.ts +++ b/apps/backend/src/retrieval/retrieval.module.ts @@ -4,6 +4,7 @@ import { DatabaseModule } from "../database/database.module.js"; import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; +import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js"; import { HttpClientModule } from "../http-client/http-client.module.js"; import { IpniModule } from "../ipni/ipni.module.js"; import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js"; @@ -18,6 +19,7 @@ import { RetrievalService } from "./retrieval.service.js"; WalletSdkModule, IpniModule, RetrievalAddonsModule, + DatasetLivenessModule, ], providers: [RetrievalService], exports: [RetrievalService], diff --git a/apps/backend/src/retrieval/retrieval.service.spec.ts b/apps/backend/src/retrieval/retrieval.service.spec.ts index a5f286cf..abe9980e 100644 --- a/apps/backend/src/retrieval/retrieval.service.spec.ts +++ b/apps/backend/src/retrieval/retrieval.service.spec.ts @@ -7,6 +7,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { RetrievalStatus } from "../database/types.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { IpniVerificationService } from "../ipni/ipni-verification.service.js"; import { DiscoverabilityCheckMetrics, RetrievalCheckMetrics } from "../metrics-prometheus/check-metrics.service.js"; import { RetrievalAddonsService } from "../retrieval-addons/retrieval-addons.service.js"; @@ -80,6 +81,8 @@ describe("RetrievalService timeouts", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, ...overrides, }) as Deal; @@ -96,6 +99,13 @@ describe("RetrievalService timeouts", () => { { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); @@ -371,6 +381,8 @@ describe("RetrievalService parallel IPNI + transport", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, metadata: { ipfs_pin: { rootCID: "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", @@ -428,6 +440,13 @@ describe("RetrievalService parallel IPNI + transport", () => { { provide: IpniVerificationService, useValue: mockIpniVerificationService }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); return module.get(RetrievalService) as unknown as RetrievalServicePrivate; @@ -612,6 +631,13 @@ describe("RetrievalService DB/provider drift", () => { { provide: IpniVerificationService, useValue: {} }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { + provide: DatasetLivenessService, + useValue: { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }, + }, ], }).compile(); return module.get(RetrievalService); @@ -668,6 +694,14 @@ describe("RetrievalService SP piece status pre-flight", () => { recordHttpResponseCode: vi.fn(), recordResultMetrics: vi.fn(), }; + const mockRetrievalRepositoryLocal = { + create: vi.fn((row: Partial) => row as Retrieval), + save: vi.fn(async (row: Retrieval) => row), + }; + const mockDatasetLivenessService = { + isDataSetLive: vi.fn().mockResolvedValue(true), + isPieceLive: vi.fn().mockResolvedValue(true), + }; afterEach(() => { vi.unstubAllGlobals(); @@ -675,6 +709,10 @@ describe("RetrievalService SP piece status pre-flight", () => { mockDealRepository.update.mockClear(); mockRetrievalAddonsService.testAllRetrievalMethods.mockClear(); mockRetrievalMetrics.recordStatus.mockClear(); + mockRetrievalRepositoryLocal.create.mockClear(); + mockRetrievalRepositoryLocal.save.mockClear(); + mockDatasetLivenessService.isDataSetLive.mockReset().mockResolvedValue(true); + mockDatasetLivenessService.isPieceLive.mockReset().mockResolvedValue(true); }); const buildDeal = (overrides: Partial = {}): Deal => @@ -683,6 +721,8 @@ describe("RetrievalService SP piece status pre-flight", () => { spAddress: "0xsp", walletAddress: "0xwallet", pieceCid: "bafy-piece", + dataSetId: "13006", + pieceId: 42, ...overrides, }) as Deal; @@ -692,19 +732,20 @@ describe("RetrievalService SP piece status pre-flight", () => { RetrievalService, { provide: RetrievalAddonsService, useValue: mockRetrievalAddonsService }, { provide: getRepositoryToken(Deal), useValue: mockDealRepository }, - { provide: getRepositoryToken(Retrieval), useValue: { create: vi.fn(), save: vi.fn() } }, + { provide: getRepositoryToken(Retrieval), useValue: mockRetrievalRepositoryLocal }, { provide: getRepositoryToken(StorageProvider), useValue: mockSpRepository }, { provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics }, { provide: DiscoverabilityCheckMetrics, useValue: { recordStatus: vi.fn(), observeIpniVerifyMs: vi.fn() } }, { provide: IpniVerificationService, useValue: { verify: vi.fn() } }, { provide: ConfigService, useValue: mockConfigService }, { provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } }, + { provide: DatasetLivenessService, useValue: mockDatasetLivenessService }, ], }).compile(); return module.get(RetrievalService) as unknown as RetrievalServicePrivate; }; - it("marks deal cleaned_up and skips retrieval when SP returns 404 for piece status", async () => { + it("emits failure.other and bails when deal has null dataSetId or pieceId", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ address: "0xsp", @@ -713,7 +754,30 @@ describe("RetrievalService SP piece status pre-flight", () => { name: "Test SP", serviceUrl: "https://sp.example.com", }); - const fetchMock = vi.fn().mockResolvedValue({ status: 404, ok: false } as Response); + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + + const result = await service.performAllRetrievals(buildDeal({ pieceId: null as unknown as number })); + + expect(result).toEqual([]); + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockDatasetLivenessService.isPieceLive).not.toHaveBeenCalled(); + expect(fetchMock).not.toHaveBeenCalled(); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + }); + + it("marks deal cleaned_up and skips retrieval when PDP pieceLive=false (no SP probe)", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(false); + const fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); const result = await service.performAllRetrievals(buildDeal()); @@ -725,6 +789,36 @@ describe("RetrievalService SP piece status pre-flight", () => { ); expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "skipped.piece_missing"); + // No SP probe when chain says piece is gone + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it("records a failed retrieval (failure.other) when SP returns 404 but pieceLive=true", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockResolvedValueOnce(true); + const fetchMock = vi.fn().mockResolvedValue({ status: 404, ok: false } as Response); + vi.stubGlobal("fetch", fetchMock); + + const result = await service.performAllRetrievals(buildDeal()); + + expect(result).toHaveLength(1); + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + expect(mockRetrievalRepositoryLocal.save).toHaveBeenCalledWith( + expect.objectContaining({ + status: RetrievalStatus.FAILED, + responseCode: 404, + errorMessage: expect.stringContaining("pieceLive=true"), + }), + ); expect(fetchMock).toHaveBeenCalledWith( "https://sp.example.com/pdp/piece/bafy-piece/status", expect.objectContaining({ method: "GET" }), @@ -748,6 +842,25 @@ describe("RetrievalService SP piece status pre-flight", () => { expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled(); }); + it("treats piece as live and proceeds when isPieceLive RPC throws (no cleanup)", async () => { + const service = await createService(); + mockSpRepository.findOne.mockResolvedValue({ + address: "0xsp", + providerId: 5, + isApproved: true, + name: "Test SP", + serviceUrl: "https://sp.example.com", + }); + mockDatasetLivenessService.isPieceLive.mockRejectedValueOnce(new Error("rpc down")); + vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ status: 200, ok: true } as Response)); + + await service.performAllRetrievals(buildDeal()).catch(() => undefined); + + expect(mockDealRepository.update).not.toHaveBeenCalled(); + expect(mockRetrievalMetrics.recordStatus).not.toHaveBeenCalledWith(expect.any(Object), "skipped.piece_missing"); + expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled(); + }); + it("proceeds with retrieval when SP probe fails with a network error (unknown)", async () => { const service = await createService(); mockSpRepository.findOne.mockResolvedValue({ diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index c148ccec..d69e1bd2 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -11,6 +11,7 @@ import { Deal } from "../database/entities/deal.entity.js"; import { Retrieval } from "../database/entities/retrieval.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealStatus, RetrievalStatus, ServiceType } from "../database/types.js"; +import { DatasetLivenessService } from "../dataset-liveness/dataset-liveness.service.js"; import { IpniVerificationService } from "../ipni/ipni-verification.service.js"; import { buildCheckMetricLabels, @@ -51,6 +52,7 @@ export class RetrievalService { private readonly ipniVerificationService: IpniVerificationService, private readonly configService: ConfigService, private readonly clickhouseService: ClickhouseService, + private readonly datasetLivenessService: DatasetLivenessService, ) {} async performRandomRetrievalForProvider( @@ -129,28 +131,82 @@ export class RetrievalService { return []; } - // Pre-flight: if the SP itself reports it no longer has this piece, mark the deal - // cleaned_up so it stops polluting future retrieval candidates and skip the check. - // Saves the 30s IPNI verify timeout + downstream block-fetch 404 noise per stale deal. + // Pre-check pipeline before any retrieval work. Requires `deal.dataSetId` + // and `deal.pieceId` to be populated (DealService writes them in the + // upload event handler). + // + // 1. Chain `pieceLive(dataSetId, pieceId)`: source of truth for whether + // the SP is still expected to retain this piece. If false (dataset + // terminated, piece never created, or piece hard-removed), mark the + // deal cleaned_up + skip. No SP probe needed in this case. + // 2. SP HTTP GET `/pdp/piece/:pieceCid/status`: cheap health-check that + // the SP can actually serve. 404 here when chain says the piece + // should be live = real SP-side failure. Recorded as a failed + // retrieval row (deal stays in the candidate pool so the scheduler + // re-probes; persistent failures become observable on dashboards). + if (deal.dataSetId == null || deal.pieceId == null) { + // Bail loudly so the row is fixed before it pollutes downstream metrics. + this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); + this.logger.error({ + ...retrievalLogContext, + event: "retrieval_missing_chain_ids", + message: "Deal is missing dataSetId or pieceId; cannot run chain pre-check. Backfill required.", + dataSetId: deal.dataSetId?.toString() ?? null, + pieceId: deal.pieceId ?? null, + }); + return []; + } + + const pieceLive = await this.checkPieceLive(deal.dataSetId, BigInt(deal.pieceId), signal, retrievalLogContext); + signal?.throwIfAborted(); + if (!pieceLive) { + const updateResult = await this.dealRepository.update( + { id: deal.id, cleanedUp: false }, + { cleanedUp: true, cleanedUpAt: new Date() }, + ); + this.retrievalMetrics.recordStatus(providerLabels, "skipped.piece_missing"); + this.logger.warn({ + ...retrievalLogContext, + event: "retrieval_skipped_piece_missing", + message: "PDP pieceLive=false; marked deal cleaned_up and skipped retrieval", + dataSetId: deal.dataSetId.toString(), + pieceId: deal.pieceId, + affected: updateResult.affected ?? 0, + }); + return []; + } + if (provider.serviceUrl && deal.pieceCid) { const probe = await this.probeSpPieceStatus(provider.serviceUrl, deal.pieceCid, signal); signal?.throwIfAborted(); if (probe.result === "missing") { - const updateResult = await this.dealRepository.update( - { id: deal.id, cleanedUp: false }, - { cleanedUp: true, cleanedUpAt: new Date() }, - ); - this.retrievalMetrics.recordStatus(providerLabels, "skipped.piece_missing"); + // Chain pre-check above confirmed the piece SHOULD be retrievable. + // SP 404 here is an SP-side failure to honor its storage commitment. + this.retrievalMetrics.recordStatus(providerLabels, "failure.other"); + const now = new Date(); + const startedAt = new Date(now.getTime() - probe.durationMs); + const failed = this.retrievalRepository.create({ + deal, + serviceType: ServiceType.IPFS_PIN, + retrievalEndpoint: probe.url, + status: RetrievalStatus.FAILED, + startedAt, + completedAt: now, + latencyMs: probe.durationMs, + responseCode: probe.statusCode ?? null, + errorMessage: "SP reports piece missing but PDP pieceLive=true", + retryCount: 0, + } as Partial); + const saved = await this.retrievalRepository.save(failed); this.logger.warn({ ...retrievalLogContext, - event: "retrieval_skipped_piece_missing", - message: "SP reports piece missing; marked deal cleaned_up and skipped retrieval", + event: "retrieval_failed_piece_missing_live", + message: "SP reports piece missing while chain reports pieceLive=true; recorded failed retrieval", statusUrl: probe.url, statusCode: probe.statusCode, probeDurationMs: probe.durationMs, - affected: updateResult.affected ?? 0, }); - return []; + return [saved]; } } @@ -418,13 +474,14 @@ export class RetrievalService { const signal = outerSignal ? AbortSignal.any([outerSignal, timeoutSignal]) : timeoutSignal; const start = Date.now(); try { + // Curio chi router does not register HEAD for /pdp/piece/{cid}/status (returns 405) + // and ignores Range headers. Body is a small JSON status payload (<500B), so just + // GET and drop the body without reading it. const res = await fetch(url, { method: "GET", signal, headers: { "User-Agent": "dealbot/probe" }, }); - // Drain/cancel the body so undici returns the socket to the pool instead of - // keeping it pinned to an unread response. Body content is irrelevant here. await res.body?.cancel().catch(() => undefined); const durationMs = Date.now() - start; if (res.status === 404) return { result: "missing", url, statusCode: 404, durationMs }; @@ -584,4 +641,32 @@ export class RetrievalService { return { ok: false, failureStatus }; } } + + /** + * Defensive wrapper around `DatasetLivenessService.isPieceLive` used by the + * retrieval pre-check. On RPC failure, return `true` (treat as live) so a + * transient chain outage does NOT cascade into bulk cleanups. The downstream + * SP probe + retrieval fetch will surface the real outcome instead. + */ + private async checkPieceLive( + dataSetId: bigint, + pieceId: bigint, + signal: AbortSignal | undefined, + logContext: RetrievalLogContext, + ): Promise { + try { + return await this.datasetLivenessService.isPieceLive(dataSetId, pieceId, signal); + } catch (error) { + if (signal?.aborted) throw error; + this.logger.warn({ + ...logContext, + event: "retrieval_piece_liveness_probe_failed", + message: "PDP pieceLive probe failed; treating piece as live to avoid spurious cleanup", + dataSetId: dataSetId.toString(), + pieceId: pieceId.toString(), + error: toStructuredError(error), + }); + return true; + } + } } diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 9373150b..8f6074f4 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -7,7 +7,7 @@ import { Injectable, Logger, type OnModuleInit } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import type { Repository } from "typeorm"; -import { type Hex } from "viem"; +import type { Account, Chain, Client, Hex, Transport } from "viem"; import { DEV_TAG } from "../common/constants.js"; import { toStructuredError } from "../common/logging.js"; import { createSynapseFromConfig } from "../common/synapse-factory.js"; @@ -15,6 +15,8 @@ import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import type { PDPProviderEx, WalletServices } from "./wallet-sdk.types.js"; +type SynapseViemClient = Client; + @Injectable() export class WalletSdkService implements OnModuleInit { private readonly logger = new Logger(WalletSdkService.name); @@ -29,7 +31,7 @@ export class WalletSdkService implements OnModuleInit { private providersLoadPromise: Promise | null = null; private providersLoadedOnce = false; private _isSessionKeyMode = false; - private _synapseClient: any; + private _synapseClient: SynapseViemClient | null = null; constructor( private readonly configService: ConfigService, @@ -316,8 +318,8 @@ export class WalletSdkService implements OnModuleInit { * Returns `null` when chain integration is disabled or the client has not been * initialized yet. */ - getSynapseClient(): unknown { - return this._synapseClient ?? null; + getSynapseClient(): SynapseViemClient | null { + return this._synapseClient; } /** @@ -326,6 +328,20 @@ export class WalletSdkService implements OnModuleInit { * done separately via the Safe multisig UI. */ async ensureWalletAllowances(): Promise { + if (!this._synapseClient) { + const reason = + process.env.DEALBOT_DISABLE_CHAIN === "true" + ? "Chain integration is disabled (DEALBOT_DISABLE_CHAIN=true)" + : "WalletSdkService may not be initialized or initialization failed"; + this.logger.error({ + event: "synapse_client_unavailable", + message: "Synapse client not available for wallet allowances check", + chainDisabled: process.env.DEALBOT_DISABLE_CHAIN === "true", + isSessionKeyMode: this._isSessionKeyMode, + }); + throw new Error(`Synapse client not available. ${reason}`); + } + if (this._isSessionKeyMode) { const { getUploadCosts } = await import("@filoz/synapse-core/warm-storage"); const costs = await getUploadCosts(this._synapseClient, { diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index d2d1a033..33957c5e 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -140,7 +140,7 @@ sequenceDiagram | `dataStorageStatus` | Data Storage | When the Data Storage check completes (all four sub-statuses done) | `success`, `failure.timedout`, `failure.other` from [Deal Status Progression](./data-storage.md#deal-status-progression). | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `discoverabilityStatus` | Data Storage, Retrieval | [`ipniVerificationComplete`](#ipniVerificationComplete) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped` when IPNI verification is bypassed because `rootCID`/`blockCIDs` are absent from deal metadata or `rootCID` cannot be parsed as a valid CID. | | | `ipfsRetrievalHttpResponseCode` | Data Storage, Retrieval | [`ipfsRetrievalLastByteReceived`](#ipfsRetrievalLastByteReceived) | `200`, `500`, `2xxSuccess`, `4xxClientError`, `5xxServerError`, `otherHttpStatusCodes`, `failure` | [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts) | -| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped.piece_missing` is emitted on the Retrieval path only when the SP's `/pdp/piece/:pieceCid/status` returns 404 during the pre-flight probe; the deal is marked `cleaned_up=true` and removed from the retrieval candidate pool. | | +| `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). On the Retrieval path, the pre-flight branches on the on-chain `PDPVerifier.pieceLive(dataSetId, pieceId)` result. When `pieceLive=false` (dataset terminated, piece never created, or piece hard-removed), `skipped.piece_missing` is emitted and the deal is marked `cleaned_up=true`; no SP probe runs. When `pieceLive=true` and the SP returns 404 on `/pdp/piece/:pieceCid/status`, `failure.other` is emitted and a failed retrieval row is recorded (deal stays in the candidate pool for re-probing). | | | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | | `dataSetChallengeStatus` | Data Retention | Emitted on each [Data Retention Check](./data-retention.md) poll when a provider's confirmed proving-period totals advance (strictly positive deltas). Unit: **challenges** (period delta × `CHALLENGES_PER_PROVING_PERIOD = 5`). | `success` (challenges in successfully-proven periods), `failure` (challenges in faulted periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pdp_provider_estimated_overdue_periods` | Data Retention | Emitted on every [Data Retention Check](./data-retention.md) poll for every successfully processed provider. | Gauge value in proving periods (non-negative integer) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) |