Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/backend/src/dataset-liveness/dataset-liveness.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from "@nestjs/common";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
import { DatasetLivenessService } from "./dataset-liveness.service.js";

@Module({
imports: [WalletSdkModule],
providers: [DatasetLivenessService],
exports: [DatasetLivenessService],
})
export class DatasetLivenessModule {}
157 changes: 157 additions & 0 deletions apps/backend/src/dataset-liveness/dataset-liveness.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { Test, TestingModule } from "@nestjs/testing";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";
import { DatasetLivenessService } from "./dataset-liveness.service.js";

vi.mock("@filoz/synapse-core/pdp-verifier", () => ({}));

vi.mock("@filoz/synapse-core/chains", () => ({
asChain: () => ({
contracts: {
pdp: {
address: "0xpdp",
abi: [],
},
},
}),
}));

const readContractMock = vi.fn();
vi.mock("viem/actions", () => ({
readContract: (...args: unknown[]) => readContractMock(...args),
}));

describe("DatasetLivenessService", () => {
let service: DatasetLivenessService;
let fetchMock: ReturnType<typeof vi.fn>;

const mockWarmStorageService = {
validateDataSet: vi.fn().mockResolvedValue(undefined),
};
const mockWalletSdkService = {
getProviderInfo: vi.fn().mockReturnValue({
id: 101n,
pdp: { serviceURL: "https://sp.example" },
}),
getWalletServices: vi.fn().mockReturnValue({
warmStorageService: mockWarmStorageService,
}),
getSynapseClient: vi.fn().mockReturnValue({ chain: { id: 314 } }),
};

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [DatasetLivenessService, { provide: WalletSdkService, useValue: mockWalletSdkService }],
}).compile();
service = module.get<DatasetLivenessService>(DatasetLivenessService);
fetchMock = vi.fn().mockResolvedValue(new Response("ok", { status: 200 }));
vi.stubGlobal("fetch", fetchMock);
});

afterEach(() => {
vi.unstubAllGlobals();
vi.restoreAllMocks();
mockWarmStorageService.validateDataSet.mockReset().mockResolvedValue(undefined);
readContractMock.mockReset();
});

describe("isDataSetLive", () => {
it("returns true when both probes report live", async () => {
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("returns false when FWSS validateDataSet reports not live", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(
new Error("Data set 1 does not exist or is not live"),
);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("returns false when SP HTTP probe returns 409 with the terminated body", async () => {
fetchMock.mockResolvedValueOnce(
new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }),
);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("treats SP HTTP 409 with a different body as live", async () => {
fetchMock.mockResolvedValueOnce(new Response("piece already exists", { status: 409 }));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("treats SP HTTP non-409 responses as live", async () => {
fetchMock.mockResolvedValueOnce(new Response("At least one piece must be provided", { status: 400 }));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("treats SP HTTP network errors as live", async () => {
fetchMock.mockRejectedValueOnce(new Error("ECONNREFUSED"));
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(true);
});

it("rethrows FWSS validateDataSet errors that do not match the terminal message", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545"));
await expect(service.isDataSetLive("0xprovider", 1n)).rejects.toThrow("ECONNREFUSED");
});

it("returns false when SP reports terminated even if FWSS RPC throws transiently", async () => {
mockWarmStorageService.validateDataSet.mockRejectedValueOnce(new Error("ECONNREFUSED 127.0.0.1:8545"));
fetchMock.mockResolvedValueOnce(
new Response("Data set has been terminated due to unrecoverable proving failure", { status: 409 }),
);
await expect(service.isDataSetLive("0xprovider", 1n)).resolves.toBe(false);
});

it("posts an empty JSON body to the SP addPieces endpoint", async () => {
await service.isDataSetLive("0xprovider", 42n);
expect(fetchMock).toHaveBeenCalledTimes(1);
const [calledUrl, init] = fetchMock.mock.calls[0] as unknown as [URL, RequestInit];
expect(String(calledUrl)).toBe("https://sp.example/pdp/data-sets/42/pieces");
expect(init.method).toBe("POST");
expect(init.body).toBe("{}");
});

it("aborts when outer signal is already aborted", async () => {
const ac = new AbortController();
ac.abort();
await expect(service.isDataSetLive("0xprovider", 1n, ac.signal)).rejects.toThrow();
});
});

describe("isPieceLive", () => {
it("returns true when PDPVerifier.pieceLive returns true", async () => {
readContractMock.mockResolvedValueOnce(true);
await expect(service.isPieceLive(1n, 42n)).resolves.toBe(true);
expect(readContractMock).toHaveBeenCalledWith(
expect.objectContaining({ chain: { id: 314 } }),
expect.objectContaining({
address: "0xpdp",
functionName: "pieceLive",
args: [1n, 42n],
}),
);
});

it("returns false when PDPVerifier.pieceLive returns false", async () => {
readContractMock.mockResolvedValueOnce(false);
await expect(service.isPieceLive(1n, 42n)).resolves.toBe(false);
});

it("throws when synapse client is not available", async () => {
mockWalletSdkService.getSynapseClient.mockReturnValueOnce(null);
await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("Synapse client not available for pieceLive read");
});

it("propagates RPC errors", async () => {
readContractMock.mockRejectedValueOnce(new Error("RPC down"));
await expect(service.isPieceLive(1n, 42n)).rejects.toThrow("RPC down");
});

it("aborts when outer signal is already aborted", async () => {
const ac = new AbortController();
ac.abort();
readContractMock.mockResolvedValueOnce(true);
await expect(service.isPieceLive(1n, 42n, ac.signal)).rejects.toThrow();
});
});
});
139 changes: 139 additions & 0 deletions apps/backend/src/dataset-liveness/dataset-liveness.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { asChain } from "@filoz/synapse-core/chains";
import { Injectable, Logger } from "@nestjs/common";
import { readContract } from "viem/actions";
import { awaitWithAbort } from "../common/abort-utils.js";
import { toStructuredError } from "../common/logging.js";
import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";

const PDP_LIVENESS_PROBE_TIMEOUT_MS = 10_000;

/**
* Composite PDP-liveness probe. Two independent probes:
*
* - FWSS `validateDataSet` (chain): wraps `PDPVerifier.dataSetLive` via
* multicall and additionally verifies the listener is this WarmStorage
* contract, so it covers chain-side liveness fully.
* - SP HTTP `POST /pdp/data-sets/{id}/pieces` (off-chain): catches Curio's
* `unrecoverable_proving_failure_epoch` state, which precedes chain
* propagation and is the only signal observable when the SP refuses
* addPieces but chain still reports the set as live.
*
* If any settled result is `false`, returns `false` even when the other
* probe threw a transient error. Otherwise rethrows the first rejection so
* a probe outage is not silently misclassified as live.
*/
@Injectable()
export class DatasetLivenessService {
private readonly logger = new Logger(DatasetLivenessService.name);

constructor(private readonly walletSdkService: WalletSdkService) {}

async isDataSetLive(providerAddress: string, dataSetId: bigint, signal?: AbortSignal): Promise<boolean> {
signal?.throwIfAborted();
const settled = await Promise.allSettled([
this.probeFwssDataSetLive(dataSetId, signal),
this.probeSpHttpDataSetLive(providerAddress, dataSetId, signal),
]);
if (settled.some((r) => r.status === "fulfilled" && r.value === false)) {
return false;
}
const rejection = settled.find((r): r is PromiseRejectedResult => r.status === "rejected");
if (rejection) throw rejection.reason;
return true;
}

/**
* On-chain check that a specific piece is still expected to be retrievable.
*
* Wraps `PDPVerifier.pieceLive(setId, pieceId)`, which returns:
* `dataSetLive(setId) && pieceId < nextPieceId[setId] && pieceLeafCounts[setId][pieceId] > 0`
*
* So `false` covers three legitimate reasons the SP can answer 404:
* 1. Dataset terminated.
* 2. Piece ID never created.
* 3. Piece hard-removed (`removePieces` finalized; leaf count zeroed).
*
* Pieces that are scheduled for removal but not yet finalized still return
* `true` — the SP remains on the hook for challenges until the next
* proving period clears the scheduledRemovals queue, so it must still
* serve GET requests.
*
* Source: `FilOzone/pdp` PDPVerifier.sol `pieceLive`.
*/
async isPieceLive(dataSetId: bigint, pieceId: bigint, signal?: AbortSignal): Promise<boolean> {
signal?.throwIfAborted();
const client = this.walletSdkService.getSynapseClient();
if (!client) {
throw new Error("Synapse client not available for pieceLive read");
}
const chain = asChain(client.chain);
const result = await awaitWithAbort(
readContract(client, {
abi: chain.contracts.pdp.abi,
address: chain.contracts.pdp.address,
functionName: "pieceLive",
args: [dataSetId, pieceId],
}),
signal,
);
return Boolean(result);
}

protected async probeFwssDataSetLive(dataSetId: bigint, signal?: AbortSignal): Promise<boolean> {
signal?.throwIfAborted();
const { warmStorageService } = this.walletSdkService.getWalletServices();
try {
await awaitWithAbort(warmStorageService.validateDataSet({ dataSetId }), signal);
return true;
} catch (error) {
if (signal?.aborted) throw error;
const message = error instanceof Error ? error.message : String(error);
if (/does not exist or is not live/i.test(message)) {
return false;
}
throw error;
}
}

protected async probeSpHttpDataSetLive(
providerAddress: string,
dataSetId: bigint,
signal?: AbortSignal,
): Promise<boolean> {
signal?.throwIfAborted();
const providerInfo = this.walletSdkService.getProviderInfo(providerAddress);
if (!providerInfo) {
throw new Error(`Provider ${providerAddress} not found in registry`);
}
const serviceURL = providerInfo.pdp?.serviceURL;
if (!serviceURL) {
throw new Error(`Provider ${providerAddress} has no PDP serviceURL`);
}
const url = new URL(`pdp/data-sets/${dataSetId.toString()}/pieces`, serviceURL);
const timeoutSignal = AbortSignal.timeout(PDP_LIVENESS_PROBE_TIMEOUT_MS);
const probeSignal = signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal;
try {
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: "{}",
signal: probeSignal,
});
if (res.status !== 409) return true;
const body = await res.text();
return !/unrecoverable proving failure/i.test(body);
} catch (error) {
if (signal?.aborted) throw error;
this.logger.warn({
event: "dataset_sp_liveness_probe_failed",
message: "SP HTTP liveness probe failed; treating dataset as live",
providerAddress,
providerId: providerInfo.id,
dataSetId: dataSetId.toString(),
serviceURL,
error: toStructuredError(error),
});
return true;
}
}
}
2 changes: 2 additions & 0 deletions apps/backend/src/deal/deal.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Deal } from "../database/entities/deal.entity.js";
import { Retrieval } from "../database/entities/retrieval.entity.js";
import { StorageProvider } from "../database/entities/storage-provider.entity.js";
import { DataSourceModule } from "../dataSource/dataSource.module.js";
import { DatasetLivenessModule } from "../dataset-liveness/dataset-liveness.module.js";
import { DealAddonsModule } from "../deal-addons/deal-addons.module.js";
import { RetrievalAddonsModule } from "../retrieval-addons/retrieval-addons.module.js";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
Expand All @@ -18,6 +19,7 @@ import { DealService } from "./deal.service.js";
WalletSdkModule,
DealAddonsModule,
RetrievalAddonsModule,
DatasetLivenessModule,
],
providers: [DealService],
exports: [DealService],
Expand Down
Loading