Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions apps/backend/src/retrieval/retrieval.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -630,3 +630,197 @@ describe("RetrievalService DB/provider drift", () => {
expect(cleanedUpCall?.params).toEqual({ cleanedUp: false });
});
});

describe("RetrievalService SP piece status pre-flight", () => {
type PublicInterface<T> = { [K in keyof T]: T[K] };
type RetrievalServicePrivate = PublicInterface<RetrievalService> & {
performAllRetrievals: (deal: Deal, signal?: AbortSignal) => Promise<Retrieval[]>;
};

const mockRetrievalAddonsService = {
testAllRetrievalMethods: vi.fn().mockResolvedValue({
dealId: "deal-1",
results: [],
summary: { totalMethods: 0, successfulMethods: 0, failedMethods: 0 },
testedAt: new Date(),
}),
getApplicableStrategies: vi.fn().mockReturnValue([{}]),
};
const mockConfigService = {
get: vi.fn((key: string) => {
if (key === "app") return { runMode: "api" };
if (key === "jobs") return { pgbossSchedulerEnabled: false };
if (key === "blockchain") return { network: "calibration" };
if (key === "dataset") return { randomDatasetSizes: [] };
return undefined;
}),
};
const mockDealRepository = {
update: vi.fn().mockResolvedValue({ affected: 1 }),
};
const mockSpRepository = { findOne: vi.fn() };
const mockRetrievalMetrics = {
observeCheckDuration: vi.fn(),
recordStatus: vi.fn(),
observeFirstByteMs: vi.fn(),
observeLastByteMs: vi.fn(),
observeThroughput: vi.fn(),
recordHttpResponseCode: vi.fn(),
recordResultMetrics: vi.fn(),
};

afterEach(() => {
vi.unstubAllGlobals();
vi.restoreAllMocks();
Comment thread
SgtPooki marked this conversation as resolved.
mockDealRepository.update.mockClear();
mockRetrievalAddonsService.testAllRetrievalMethods.mockClear();
mockRetrievalMetrics.recordStatus.mockClear();
});

const buildDeal = (overrides: Partial<Deal> = {}): Deal =>
({
id: "deal-1",
spAddress: "0xsp",
walletAddress: "0xwallet",
pieceCid: "bafy-piece",
...overrides,
}) as Deal;

const createService = async (): Promise<RetrievalServicePrivate> => {
const module: TestingModule = await Test.createTestingModule({
providers: [
RetrievalService,
{ provide: RetrievalAddonsService, useValue: mockRetrievalAddonsService },
{ provide: getRepositoryToken(Deal), useValue: mockDealRepository },
{ provide: getRepositoryToken(Retrieval), useValue: { create: vi.fn(), save: vi.fn() } },
{ provide: getRepositoryToken(StorageProvider), useValue: mockSpRepository },
{ provide: RetrievalCheckMetrics, useValue: mockRetrievalMetrics },
{ provide: DiscoverabilityCheckMetrics, useValue: { recordStatus: vi.fn(), observeIpniVerifyMs: vi.fn() } },
{ provide: IpniVerificationService, useValue: { verify: vi.fn() } },
{ provide: ConfigService, useValue: mockConfigService },
{ provide: ClickhouseService, useValue: { insert: vi.fn(), probeLocation: "test" } },
],
}).compile();
return module.get<RetrievalService>(RetrievalService) as unknown as RetrievalServicePrivate;
};

it("marks deal cleaned_up and skips retrieval when SP returns 404 for piece status", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: "https://sp.example.com",
});
const fetchMock = vi.fn().mockResolvedValue({ status: 404, ok: false } as Response);
vi.stubGlobal("fetch", fetchMock);

const result = await service.performAllRetrievals(buildDeal());

expect(result).toEqual([]);
expect(mockDealRepository.update).toHaveBeenCalledWith(
{ id: "deal-1", cleanedUp: false },
expect.objectContaining({ cleanedUp: true, cleanedUpAt: expect.any(Date) }),
);
expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled();
expect(mockRetrievalMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "skipped.piece_missing");
expect(fetchMock).toHaveBeenCalledWith(
"https://sp.example.com/pdp/piece/bafy-piece/status",
expect.objectContaining({ method: "GET" }),
);
});

it("proceeds with retrieval when SP confirms piece exists", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: "https://sp.example.com",
});
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({ status: 200, ok: true } as Response));

await service.performAllRetrievals(buildDeal()).catch(() => undefined);

expect(mockDealRepository.update).not.toHaveBeenCalled();
expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled();
});

it("proceeds with retrieval when SP probe fails with a network error (unknown)", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: "https://sp.example.com",
});
vi.stubGlobal("fetch", vi.fn().mockRejectedValue(new Error("network unreachable")));

await service.performAllRetrievals(buildDeal()).catch(() => undefined);

expect(mockDealRepository.update).not.toHaveBeenCalled();
expect(mockRetrievalAddonsService.testAllRetrievalMethods).toHaveBeenCalled();
});

it("skips probe and proceeds when provider has no serviceUrl", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: null,
});
const fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);

await service.performAllRetrievals(buildDeal()).catch(() => undefined);

expect(fetchMock).not.toHaveBeenCalled();
expect(mockDealRepository.update).not.toHaveBeenCalled();
});

it("re-throws when the outer signal aborts during the probe", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: "https://sp.example.com",
});
const ac = new AbortController();
const fetchMock = vi.fn().mockImplementation(() => {
ac.abort(new Error("cancelled by job"));
return Promise.reject(new DOMException("aborted", "AbortError"));
});
vi.stubGlobal("fetch", fetchMock);

await expect(service.performAllRetrievals(buildDeal(), ac.signal)).rejects.toThrow();
expect(mockDealRepository.update).not.toHaveBeenCalled();
expect(mockRetrievalAddonsService.testAllRetrievalMethods).not.toHaveBeenCalled();
});

it("strips trailing slash from serviceUrl and URL-encodes the pieceCid", async () => {
const service = await createService();
mockSpRepository.findOne.mockResolvedValue({
address: "0xsp",
providerId: 5,
isApproved: true,
name: "Test SP",
serviceUrl: "https://sp.example.com/",
});
const fetchMock = vi.fn().mockResolvedValue({ status: 200, ok: true } as Response);
vi.stubGlobal("fetch", fetchMock);

await service.performAllRetrievals(buildDeal({ pieceCid: "bafy/with/slashes" })).catch(() => undefined);

expect(fetchMock).toHaveBeenCalledWith(
"https://sp.example.com/pdp/piece/bafy%2Fwith%2Fslashes/status",
Comment thread
SgtPooki marked this conversation as resolved.
expect.any(Object),
);
});
});
71 changes: 71 additions & 0 deletions apps/backend/src/retrieval/retrieval.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import type {
RetrievalTestResult,
} from "../retrieval-addons/types.js";

/** Timeout for the pre-flight SP piece-status probe. Short enough that an unresponsive
* SP still beats falling through to the 30s IPNI verify path; on timeout we treat
* the result as "unknown" and proceed with the normal retrieval. */
const SP_PIECE_STATUS_PROBE_TIMEOUT_MS = 5_000;

Comment thread
SgtPooki marked this conversation as resolved.
@Injectable()
export class RetrievalService {
private readonly logger = new Logger(RetrievalService.name);
Expand Down Expand Up @@ -120,6 +125,31 @@ export class RetrievalService {
return [];
}

// Pre-flight: if the SP itself reports it no longer has this piece, mark the deal
// cleaned_up so it stops polluting future retrieval candidates and skip the check.
// Saves the 30s IPNI verify timeout + downstream block-fetch 404 noise per stale deal.
if (provider.serviceUrl && deal.pieceCid) {
const probe = await this.probeSpPieceStatus(provider.serviceUrl, deal.pieceCid, signal);
signal?.throwIfAborted();
if (probe.result === "missing") {
const updateResult = await this.dealRepository.update(
{ id: deal.id, cleanedUp: false },
{ cleanedUp: true, cleanedUpAt: new Date() },
);
this.retrievalMetrics.recordStatus(providerLabels, "skipped.piece_missing");
this.logger.warn({
...retrievalLogContext,
event: "retrieval_skipped_piece_missing",
message: "SP reports piece missing; marked deal cleaned_up and skipped retrieval",
statusUrl: probe.url,
statusCode: probe.statusCode,
probeDurationMs: probe.durationMs,
affected: updateResult.affected ?? 0,
});
return [];
}
}

type SubStatus = "success" | "failure.timedout" | "failure.other";
let terminalStatus: SubStatus | null = null;
let retrievals: Retrieval[] = [];
Expand Down Expand Up @@ -366,6 +396,47 @@ export class RetrievalService {
return this.spRepository.findOne({ where: { address } });
}

/**
* Probe `${serviceUrl}/pdp/piece/:pieceCid/status` to determine whether the SP
* currently has the piece. Returns:
* - "missing": SP responded 404 (authoritative — SP does not have the piece)
* - "exists": SP responded 2xx (piece is held)
* - "unknown": network error, probe timeout, or other status (don't act on it)
* An outer-signal abort during the probe is re-thrown so the caller can stop.
*/
private async probeSpPieceStatus(
serviceUrl: string,
pieceCid: string,
outerSignal?: AbortSignal,
): Promise<{ result: "missing" | "exists" | "unknown"; url: string; statusCode: number | null; durationMs: number }> {
const url = `${serviceUrl.replace(/\/$/, "")}/pdp/piece/${encodeURIComponent(pieceCid)}/status`;
const timeoutSignal = AbortSignal.timeout(SP_PIECE_STATUS_PROBE_TIMEOUT_MS);
const signal = outerSignal ? AbortSignal.any([outerSignal, timeoutSignal]) : timeoutSignal;
const start = Date.now();
try {
const res = await fetch(url, {
method: "GET",
signal,
headers: { "User-Agent": "dealbot/probe" },
});
// Drain/cancel the body so undici returns the socket to the pool instead of
// keeping it pinned to an unread response. Body content is irrelevant here.
await res.body?.cancel().catch(() => undefined);
const durationMs = Date.now() - start;
if (res.status === 404) return { result: "missing", url, statusCode: 404, durationMs };
if (res.ok) return { result: "exists", url, statusCode: res.status, durationMs };
return { result: "unknown", url, statusCode: res.status, durationMs };
} catch (error) {
// Re-throw caller-initiated aborts so retrieval stops promptly. Probe-timeout
// and network errors fall through as "unknown" — we don't want to mark deals
// cleaned_up on flaky infra.
if (outerSignal?.aborted) {
throw error;
}
return { result: "unknown", url, statusCode: null, durationMs: Date.now() - start };
}
}

/**
* We select a random successful deal (DEAL_CREATED only) for a given provider.
* Uses Postgres ORDER BY RANDOM() since Dealbot is Postgres-only.
Expand Down
2 changes: 1 addition & 1 deletion docs/checks/events-and-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ sequenceDiagram
| <a id="dataStorageStatus"></a>`dataStorageStatus` | Data Storage | When the Data Storage check completes (all four sub-statuses done) | `success`, `failure.timedout`, `failure.other` from [Deal Status Progression](./data-storage.md#deal-status-progression). | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) |
| <a id="discoverabilityStatus"></a>`discoverabilityStatus` | Data Storage, Retrieval | [`ipniVerificationComplete`](#ipniVerificationComplete) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). | |
| <a id="ipfsRetrievalHttpResponseCode"></a>`ipfsRetrievalHttpResponseCode` | Data Storage, Retrieval | [`ipfsRetrievalLastByteReceived`](#ipfsRetrievalLastByteReceived) | `200`, `500`, `2xxSuccess`, `4xxClientError`, `5xxServerError`, `otherHttpStatusCodes`, `failure` | [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts) |
| <a id="retrievalStatus"></a>`retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). | |
| <a id="retrievalStatus"></a>`retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). `skipped.piece_missing` is emitted on the Retrieval path only when the SP's `/pdp/piece/:pieceCid/status` returns 404 during the pre-flight probe; the deal is marked `cleaned_up=true` and removed from the retrieval candidate pool. | |
| <a id="dataSetCreationStatus"></a>`dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) |
| <a id="dataSetChallengeStatus"></a>`dataSetChallengeStatus` | Data Retention | Emitted on each [Data Retention Check](./data-retention.md) poll when a provider's confirmed proving-period totals advance (strictly positive deltas). Unit: **challenges** (period delta × `CHALLENGES_PER_PROVING_PERIOD = 5`). | `success` (challenges in successfully-proven periods), `failure` (challenges in faulted periods) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) |
| <a id="pdp_provider_estimated_overdue_periods"></a>`pdp_provider_estimated_overdue_periods` | Data Retention | Emitted on every [Data Retention Check](./data-retention.md) poll for every successfully processed provider. | Gauge value in proving periods (non-negative integer) | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) |
Expand Down
4 changes: 4 additions & 0 deletions docs/checks/retrievals.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ Metric definitions (including Prometheus metrics) live in [Dealbot Events & Metr

`retrievalStatus` counts the `/ipfs` transport stage only (assertions 2 and 3). The IPNI assertion (1) is counted on `discoverabilityStatus`. There is no composite "retrieval check" counter; overall success comes from `dataStorageStatus`, which is `success` only when all sub-statuses succeed. See [Deal Status Progression](./data-storage.md#deal-status-progression).

### `skipped.piece_missing`

Emitted when a retrieval pre-flight probe to `${serviceUrl}/pdp/piece/:pieceCid/status` returns `404`. The deal is marked `cleaned_up=true` and removed from future retrieval candidate selection. This is not a failure of the SP's transport surface, but a signal that the piece no longer exists on the SP while the dataset is still live (for example, the SP scheduled a piece removal via PDP, or the piece dropped without an on-chain notification). The probe runs before IPNI verification and transport, so a 30s IPNI timeout is avoided on stale candidates. Search logs for `retrieval_skipped_piece_missing` to correlate.

## Configuration

Key environment variables that control retrieval testing:
Expand Down