diff --git a/apps/backend/.env.example b/apps/backend/.env.example index e614e6f0..a3c69b10 100644 --- a/apps/backend/.env.example +++ b/apps/backend/.env.example @@ -55,6 +55,11 @@ DEALBOT_MAINTENANCE_WINDOW_MINUTES=20 DEALS_PER_SP_PER_HOUR=2 DATASET_CREATIONS_PER_SP_PER_HOUR=1 RETRIEVALS_PER_SP_PER_HOUR=1 +# data_set_lifecycle_check canary: creates a throwaway data set and terminates it each tick +# (defaults: enabled on calibration, disabled on mainnet). +# DATASET_LIFECYCLE_CHECK_ENABLED=true +DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR=1 +DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS=600 # 10m: create + upload + terminate + pdpEndEpoch poll PG_BOSS_LOCAL_CONCURRENCY=20 JOB_SCHEDULER_POLL_SECONDS=300 JOB_WORKER_POLL_SECONDS=60 diff --git a/apps/backend/src/common/constants.ts b/apps/backend/src/common/constants.ts index 57416ae0..ebcc3ef6 100644 --- a/apps/backend/src/common/constants.ts +++ b/apps/backend/src/common/constants.ts @@ -7,3 +7,10 @@ export const ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"; export const MAX_BLOCK_SIZE = 5 * 1024 * 1024; export const DEV_TAG = stringToHex("dev"); + +/** + * Fixed metadata marker key tagging every throwaway data set created by the + * `data_set_lifecycle_check` job. The value is a per-run nonce; the key is the stable + * handle operators use to list/sweep leaked sets (create-OK / terminate-failed runs). + */ +export const LIFECYCLE_CHECK_METADATA_KEY = "dealbotLifecycleCheck"; diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index e2d69088..45305ff2 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -80,7 +80,12 @@ export const configValidationSchema = Joi.object({ // Per-hour limits are guardrails to avoid excessive background load. DEALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(4), DATASET_CREATIONS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1), + DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1), RETRIEVALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(2), + // Enables the data_set_lifecycle_check canary job. The network-dependent default (true on + // calibration, false on mainnet) is resolved in loadConfig; here we only validate the + // type when explicitly set. See docs/checks/data-set-lifecycle-check.md. + DATASET_LIFECYCLE_CHECK_ENABLED: Joi.boolean().optional(), // Polling interval for pg-boss scheduler (lower = more responsive, higher = less DB chatter). JOB_SCHEDULER_POLL_SECONDS: Joi.number().min(60).default(300), JOB_WORKER_POLL_SECONDS: Joi.number().min(5).default(60), @@ -93,6 +98,7 @@ export const configValidationSchema = Joi.object({ DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes) RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds) DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs + DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(600), // 10 minutes: covers create + seed-piece upload + terminate + pdpEndEpoch poll // Seconds to hold the process alive after pg-boss drain completes, so Prometheus // captures at least one scrape of the terminal counter increments emitted during // shutdown. Default 35 covers the 30s ServiceMonitor interval plus a 5s buffer. @@ -226,6 +232,17 @@ export interface IJobsConfig { * Target number of dataset creation runs per storage provider per hour. */ dataSetCreationsPerSpPerHour: number; + /** + * Enables the `data_set_lifecycle_check` canary job, which creates a + * throwaway data set and immediately terminates it in a single tick. + * + * Defaults to true on calibration and false on mainnet. + */ + dataSetLifecycleCheckEnabled: boolean; + /** + * Target number of dataset lifecycle check runs per storage provider per hour. + */ + dataSetLifecycleChecksPerSpPerHour: number; /** * How often the scheduler polls Postgres for due jobs (seconds). * @@ -284,6 +301,13 @@ export interface IJobsConfig { * Uses AbortController to actively cancel job execution. */ dataSetCreationJobTimeoutSeconds: number; + /** + * Maximum runtime (seconds) for data-set lifecycle check jobs before forced abort. + * + * Bounds the create-with-seed-piece upload, the terminateService call, and the + * `pdpEndEpoch != 0` confirmation poll. Uses AbortController to actively cancel execution. + */ + dataSetLifecycleCheckJobTimeoutSeconds: number; /** * Maximum runtime (seconds) for retrieval jobs before forced abort. * @@ -473,6 +497,17 @@ export function loadConfig(): IConfig { dealsPerSpPerHour: Number.parseFloat(process.env.DEALS_PER_SP_PER_HOUR || "4"), retrievalsPerSpPerHour: Number.parseFloat(process.env.RETRIEVALS_PER_SP_PER_HOUR || "2"), dataSetCreationsPerSpPerHour: Number.parseFloat(process.env.DATASET_CREATIONS_PER_SP_PER_HOUR || "1"), + dataSetLifecycleCheckEnabled: (() => { + const raw = process.env.DATASET_LIFECYCLE_CHECK_ENABLED; + if (raw == null || raw.trim().length === 0) { + // Default: enabled on calibration, disabled on mainnet. + return (process.env.NETWORK || "calibration") === "calibration"; + } + return raw === "true"; + })(), + dataSetLifecycleChecksPerSpPerHour: Number.parseFloat( + process.env.DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR || "1", + ), schedulerPollSeconds: Number.parseInt(process.env.JOB_SCHEDULER_POLL_SECONDS || "300", 10), workerPollSeconds: Number.parseInt(process.env.JOB_WORKER_POLL_SECONDS || "60", 10), pgbossLocalConcurrency: Number.parseInt(process.env.PG_BOSS_LOCAL_CONCURRENCY || "20", 10), @@ -484,6 +519,10 @@ export function loadConfig(): IConfig { dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10), retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10), dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10), + dataSetLifecycleCheckJobTimeoutSeconds: Number.parseInt( + process.env.DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS || "600", + 10, + ), shutdownFinalScrapeDelaySeconds: Number.parseInt(process.env.SHUTDOWN_FINAL_SCRAPE_DELAY_SECONDS || "35", 10), pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || String(1 / 24)), maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10), diff --git a/apps/backend/src/data-set-lifecycle/data-set-lifecycle.module.ts b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.module.ts new file mode 100644 index 00000000..8d917f17 --- /dev/null +++ b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.module.ts @@ -0,0 +1,11 @@ +import { Module } from "@nestjs/common"; +import { MetricsPrometheusModule } from "../metrics-prometheus/metrics-prometheus.module.js"; +import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; +import { DataSetLifecycleService } from "./data-set-lifecycle.service.js"; + +@Module({ + imports: [WalletSdkModule, MetricsPrometheusModule], + providers: [DataSetLifecycleService], + exports: [DataSetLifecycleService], +}) +export class DataSetLifecycleModule {} diff --git a/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.spec.ts b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.spec.ts new file mode 100644 index 00000000..be6b57b2 --- /dev/null +++ b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.spec.ts @@ -0,0 +1,131 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { DataSetLifecycleCheckMetrics } from "../metrics-prometheus/check-metrics.service.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; +import { DataSetLifecycleService } from "./data-set-lifecycle.service.js"; + +vi.mock("@filoz/synapse-core/sp", () => ({ + createDataSet: vi.fn(), + waitForCreateDataSet: vi.fn(), +})); + +vi.mock("@filoz/synapse-core/warm-storage", () => ({ + terminateServiceSync: vi.fn(), +})); + +const { createDataSet, waitForCreateDataSet } = await import("@filoz/synapse-core/sp"); +const { terminateServiceSync } = await import("@filoz/synapse-core/warm-storage"); + +const mockClient = { account: { address: "0xwallet" } }; + +const mockProviderInfo = { + id: 1n, + name: "test-sp", + isApproved: true, + serviceProvider: "0xsp" as `0x${string}`, + payee: "0xpayee" as `0x${string}`, + pdp: { serviceURL: "https://sp.example.com" }, +}; + +const mockWalletSdkService = { + getProviderInfo: vi.fn(() => mockProviderInfo), + getSynapseClient: vi.fn(() => mockClient), +} as unknown as WalletSdkService; + +const mockMetrics = { + observeCheckDuration: vi.fn(), + recordStatus: vi.fn(), +} as unknown as DataSetLifecycleCheckMetrics; + +describe("DataSetLifecycleService", () => { + let service: DataSetLifecycleService; + + beforeEach(() => { + vi.clearAllMocks(); + service = new DataSetLifecycleService(mockWalletSdkService, mockMetrics); + }); + + it("creates an empty data set, waits for confirmation, terminates it, and records success", async () => { + vi.mocked(createDataSet).mockResolvedValue({ txHash: "0xhash1", statusUrl: "https://sp.example.com/status/1" }); + vi.mocked(waitForCreateDataSet).mockResolvedValue({ + dataSetId: 42n, + dataSetCreated: true, + txStatus: "confirmed", + ok: true, + createMessageHash: "0xmsg", + service: "https://sp.example.com", + }); + vi.mocked(terminateServiceSync).mockResolvedValue({ receipt: {} as any, event: {} as any }); + + await service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-123" }); + + expect(createDataSet).toHaveBeenCalledWith( + mockClient, + expect.objectContaining({ + cdn: false, + payee: "0xpayee", + serviceURL: "https://sp.example.com", + metadata: { dealbotLifecycleCheck: "nonce-123" }, + }), + ); + expect(waitForCreateDataSet).toHaveBeenCalledWith( + expect.objectContaining({ statusUrl: "https://sp.example.com/status/1" }), + ); + expect(terminateServiceSync).toHaveBeenCalledWith(mockClient, expect.objectContaining({ dataSetId: 42n })); + expect(mockMetrics.observeCheckDuration).toHaveBeenCalledOnce(); + expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "success"); + }); + + it("records failure.timedout when signal is aborted before creation", async () => { + const controller = new AbortController(); + controller.abort(new Error("job timeout")); + + await expect( + service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-456" }, controller.signal), + ).rejects.toThrow(); + + expect(createDataSet).not.toHaveBeenCalled(); + expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.timedout"); + }); + + it("records failure.other when creation rejects with a non-abort error", async () => { + vi.mocked(createDataSet).mockRejectedValue(new Error("SP unreachable")); + + await expect(service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-789" })).rejects.toThrow( + "SP unreachable", + ); + + expect(terminateServiceSync).not.toHaveBeenCalled(); + expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + }); + + it("records failure.other when termination fails after creation, logging the dataSetId as leaked", async () => { + vi.mocked(createDataSet).mockResolvedValue({ txHash: "0xhash2", statusUrl: "https://sp.example.com/status/2" }); + vi.mocked(waitForCreateDataSet).mockResolvedValue({ + dataSetId: 99n, + dataSetCreated: true, + txStatus: "confirmed", + ok: true, + createMessageHash: "0xmsg2", + service: "https://sp.example.com", + }); + vi.mocked(terminateServiceSync).mockRejectedValue(new Error("terminate failed")); + + await expect(service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-999" })).rejects.toThrow( + "terminate failed", + ); + + expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other"); + }); + + it("throws when provider is not found in registry", async () => { + vi.mocked(mockWalletSdkService.getProviderInfo).mockReturnValueOnce(undefined); + + await expect(service.runLifecycleCheck("0xunknown", {})).rejects.toThrow("not found in registry"); + }); + + it("throws when synapse client is not initialized", async () => { + vi.mocked(mockWalletSdkService.getSynapseClient).mockReturnValueOnce(null); + + await expect(service.runLifecycleCheck("0xsp", {})).rejects.toThrow("not initialized"); + }); +}); diff --git a/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts new file mode 100644 index 00000000..421358f9 --- /dev/null +++ b/apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts @@ -0,0 +1,149 @@ +import { createDataSet, waitForCreateDataSet } from "@filoz/synapse-core/sp"; +import { terminateServiceSync } from "@filoz/synapse-core/warm-storage"; +import { Injectable, Logger } from "@nestjs/common"; +import { awaitWithAbort } from "../common/abort-utils.js"; +import { toStructuredError } from "../common/logging.js"; +import { buildCheckMetricLabels, classifyFailureStatus } from "../metrics-prometheus/check-metric-labels.js"; +import { DataSetLifecycleCheckMetrics } from "../metrics-prometheus/check-metrics.service.js"; +import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; + +@Injectable() +export class DataSetLifecycleService { + private readonly logger = new Logger(DataSetLifecycleService.name); + + constructor( + private readonly walletSdkService: WalletSdkService, + private readonly lifecycleCheckMetrics: DataSetLifecycleCheckMetrics, + ) {} + + /** + * Run one data-set lifecycle check: create an empty throwaway data set on the SP, + * wait for on-chain confirmation, then immediately terminate it. Used by the + * `data_set_lifecycle_check` canary job to validate that an SP honours the full + * create → terminate lifecycle. + * + * Never touches managed check data sets and creates no Deal rows. The throwaway set + * is identified by the fixed `dealbotLifecycleCheck` marker key in `metadata`; a + * per-run nonce value prevents accidentally reusing a prior leaked set. If creation + * succeeds but termination fails the set leaks (accepted trade-off); operators can + * sweep leaks by that key. + * + * Emits only `dataSetLifecycleCheckStatus` / `dataSetLifecycleCheckMs` metrics. + */ + async runLifecycleCheck(spAddress: string, metadata: Record, signal?: AbortSignal): Promise { + const providerInfo = this.walletSdkService.getProviderInfo(spAddress); + if (!providerInfo) { + throw new Error(`Provider ${spAddress} not found in registry`); + } + + const client = this.walletSdkService.getSynapseClient(); + if (!client) { + throw new Error("Synapse client not initialized"); + } + + const labels = buildCheckMetricLabels({ + checkType: "dataSetLifecycleCheck", + providerId: providerInfo.id, + providerName: providerInfo.name, + providerIsApproved: providerInfo.isApproved, + }); + + const logContext = { + providerAddress: spAddress, + providerName: providerInfo.name, + providerId: providerInfo.id, + }; + + const startedAt = Date.now(); + this.logger.log({ + event: "dataset_lifecycle_check_started", + message: "Starting data-set lifecycle check", + ...logContext, + }); + + let dataSetId: bigint | undefined; + try { + signal?.throwIfAborted(); + + // 1. Request creation of an empty data set on the SP. + const createResult = await awaitWithAbort( + createDataSet(client, { + cdn: false, + payee: providerInfo.payee, + serviceURL: providerInfo.pdp.serviceURL, + metadata, + }), + signal, + ); + signal?.throwIfAborted(); + + this.logger.log({ + event: "dataset_lifecycle_check_creating", + message: "Empty data set creation submitted; waiting for SP confirmation", + ...logContext, + txHash: createResult.txHash, + }); + + // 2. Wait for the SP to confirm the data set is created and extract the dataSetId. + const confirmed = await awaitWithAbort(waitForCreateDataSet({ statusUrl: createResult.statusUrl }), signal); + dataSetId = confirmed.dataSetId; + signal?.throwIfAborted(); + + this.logger.log({ + event: "dataset_lifecycle_check_created", + message: "Empty data set created and confirmed on-chain", + ...logContext, + dataSetId: dataSetId.toString(), + }); + + // 3. Immediately terminate the throwaway data set. + await awaitWithAbort( + terminateServiceSync(client, { + dataSetId, + onHash: (hash) => { + this.logger.log({ + event: "dataset_lifecycle_check_terminating", + message: "Terminate transaction submitted", + ...logContext, + dataSetId: (dataSetId as bigint).toString(), + txHash: hash, + }); + }, + }), + signal, + ); + + const durationMs = Date.now() - startedAt; + this.lifecycleCheckMetrics.observeCheckDuration(labels, durationMs); + this.lifecycleCheckMetrics.recordStatus(labels, "success"); + + this.logger.log({ + event: "dataset_lifecycle_check_succeeded", + message: "Data-set lifecycle check completed: created and terminated throwaway data set", + ...logContext, + dataSetId: dataSetId.toString(), + durationMs, + }); + } catch (error) { + const durationMs = Date.now() - startedAt; + const status = signal?.aborted ? "failure.timedout" : classifyFailureStatus(error); + if (status === "failure.timedout") { + this.lifecycleCheckMetrics.observeCheckDuration(labels, durationMs); + } + this.lifecycleCheckMetrics.recordStatus(labels, status); + this.logger.error({ + event: "dataset_lifecycle_check_failed", + message: + dataSetId === undefined + ? "Data-set lifecycle check failed during creation" + : "Data-set lifecycle check failed during termination; throwaway data set may have leaked", + ...logContext, + dataSetId: dataSetId?.toString(), + durationMs, + status, + error: toStructuredError(error), + }); + throw error; + } + } +} diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index 4d801d2a..06aa72c5 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -4,6 +4,7 @@ export type JobType = | "deal" | "retrieval" | "data_set_creation" + | "data_set_lifecycle_check" | "pull_check" | "providers_refresh" | "data_retention_poll" diff --git a/apps/backend/src/jobs/jobs.module.ts b/apps/backend/src/jobs/jobs.module.ts index 12328093..4fc3dfbf 100644 --- a/apps/backend/src/jobs/jobs.module.ts +++ b/apps/backend/src/jobs/jobs.module.ts @@ -1,6 +1,7 @@ import { Module } from "@nestjs/common"; import { TypeOrmModule } from "@nestjs/typeorm"; import { DataRetentionModule } from "../data-retention/data-retention.module.js"; +import { DataSetLifecycleModule } from "../data-set-lifecycle/data-set-lifecycle.module.js"; import { DatabaseModule } from "../database/database.module.js"; import { JobScheduleState } from "../database/entities/job-schedule-state.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -16,6 +17,7 @@ import { JobScheduleRepository } from "./repositories/job-schedule.repository.js imports: [ DatabaseModule, TypeOrmModule.forFeature([StorageProvider, JobScheduleState]), + DataSetLifecycleModule, DealModule, RetrievalModule, WalletSdkModule, diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index b25cd552..350ff463 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -25,6 +25,7 @@ describe("JobsService schedule rows", () => { let jobScheduleRepositoryMock: { upsertSchedule: ReturnType; deleteSchedulesForInactiveProviders: ReturnType; + deleteSchedulesByJobType: ReturnType; countPausedSchedules: ReturnType; findDueSchedulesWithManager: ReturnType; runTransaction: ReturnType; @@ -73,6 +74,7 @@ describe("JobsService schedule rows", () => { jobDuration: JobsServiceDeps[18]; storageProvidersActive: JobsServiceDeps[19]; storageProvidersTested: JobsServiceDeps[20]; + dataSetLifecycleService: JobsServiceDeps[21]; }>, ) => JobsService; @@ -86,6 +88,7 @@ describe("JobsService schedule rows", () => { jobScheduleRepositoryMock = { upsertSchedule: vi.fn(), deleteSchedulesForInactiveProviders: vi.fn(async () => []), + deleteSchedulesByJobType: vi.fn(async () => 0), countPausedSchedules: vi.fn(async () => []), findDueSchedulesWithManager: vi.fn(), runTransaction: vi.fn(async (callback: (manager: unknown) => Promise) => { @@ -139,6 +142,9 @@ describe("JobsService schedule rows", () => { dealJobTimeoutSeconds: 360, retrievalJobTimeoutSeconds: 60, dataSetCreationJobTimeoutSeconds: 300, + dataSetLifecycleCheckEnabled: false, + dataSetLifecycleChecksPerSpPerHour: 1, + dataSetLifecycleCheckJobTimeoutSeconds: 600, shutdownFinalScrapeDelaySeconds: 35, pieceCleanupPerSpPerHour: 1, maxPieceCleanupRuntimeSeconds: 300, @@ -192,6 +198,7 @@ describe("JobsService schedule rows", () => { overrides.jobDuration ?? metricsMocks.jobDuration, overrides.storageProvidersActive ?? metricsMocks.storageProvidersActive, overrides.storageProvidersTested ?? metricsMocks.storageProvidersTested, + overrides.dataSetLifecycleService ?? ({} as JobsServiceDeps[21]), ); service = buildService(); @@ -1250,6 +1257,103 @@ describe("JobsService schedule rows", () => { expect(dealService.createDataSetWithPiece).not.toHaveBeenCalled(); }); + it("data_set_lifecycle_check job skips when disabled", async () => { + baseConfigValues = { + ...baseConfigValues, + jobs: { ...baseConfigValues.jobs, dataSetLifecycleCheckEnabled: false } as IConfig["jobs"], + }; + configService = { + get: vi.fn((key: keyof IConfig) => baseConfigValues[key]), + } as unknown as JobsServiceDeps[0]; + + const dataSetLifecycleService = { runLifecycleCheck: vi.fn() }; + const walletSdkService = { getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })) }; + + service = buildService({ + configService, + dataSetLifecycleService: dataSetLifecycleService as unknown as JobsServiceDeps[21], + walletSdkService: walletSdkService as unknown as ConstructorParameters[5], + }); + + await callPrivate(service, "handleDataSetLifecycleCheckJob", { + id: "job-lc-1", + data: { jobType: "data_set_lifecycle_check", spAddress: "0xaaa", intervalSeconds: 3600 }, + }); + + expect(dataSetLifecycleService.runLifecycleCheck).not.toHaveBeenCalled(); + }); + + it("data_set_lifecycle_check job creates and terminates a throwaway data set when enabled", async () => { + baseConfigValues = { + ...baseConfigValues, + jobs: { ...baseConfigValues.jobs, dataSetLifecycleCheckEnabled: true } as IConfig["jobs"], + }; + configService = { + get: vi.fn((key: keyof IConfig) => baseConfigValues[key]), + } as unknown as JobsServiceDeps[0]; + + const dataSetLifecycleService = { runLifecycleCheck: vi.fn(async () => undefined) }; + const walletSdkService = { getProviderInfo: vi.fn(() => ({ id: 1, name: "test-provider" })) }; + + service = buildService({ + configService, + dataSetLifecycleService: dataSetLifecycleService as unknown as JobsServiceDeps[21], + walletSdkService: walletSdkService as unknown as ConstructorParameters[5], + }); + + await callPrivate(service, "handleDataSetLifecycleCheckJob", { + id: "job-lc-2", + data: { jobType: "data_set_lifecycle_check", spAddress: "0xaaa", intervalSeconds: 3600 }, + }); + + expect(dataSetLifecycleService.runLifecycleCheck).toHaveBeenCalledWith( + "0xaaa", + expect.objectContaining({ dealbotLifecycleCheck: expect.any(String) }), + expect.any(AbortSignal), + ); + // The fixed marker key is the only metadata; no base/slot metadata is attached. + const metadataArg = (dataSetLifecycleService.runLifecycleCheck.mock.calls[0] as unknown[])[1] as Record< + string, + string + >; + expect(Object.keys(metadataArg)).toEqual(["dealbotLifecycleCheck"]); + }); + + it("creates data_set_lifecycle_check schedules when enabled", async () => { + baseConfigValues = { + ...baseConfigValues, + jobs: { ...baseConfigValues.jobs, dataSetLifecycleCheckEnabled: true } as IConfig["jobs"], + }; + configService = { + get: vi.fn((key: keyof IConfig) => baseConfigValues[key]), + } as unknown as JobsServiceDeps[0]; + service = buildService({ configService }); + + storageProviderRepositoryMock.find.mockResolvedValueOnce([{ address: "0xaaa" }]); + + await callPrivate(service, "ensureScheduleRows"); + + const lifecycleUpserts = jobScheduleRepositoryMock.upsertSchedule.mock.calls.filter( + (call) => call[0] === "data_set_lifecycle_check", + ); + expect(lifecycleUpserts).toHaveLength(1); + expect(lifecycleUpserts[0][1]).toBe("0xaaa"); + expect(jobScheduleRepositoryMock.deleteSchedulesByJobType).not.toHaveBeenCalled(); + }); + + it("removes data_set_lifecycle_check schedules when disabled", async () => { + // base config has dataSetLifecycleCheckEnabled=false + storageProviderRepositoryMock.find.mockResolvedValueOnce([{ address: "0xaaa" }]); + + await callPrivate(service, "ensureScheduleRows"); + + const lifecycleUpserts = jobScheduleRepositoryMock.upsertSchedule.mock.calls.filter( + (call) => call[0] === "data_set_lifecycle_check", + ); + expect(lifecycleUpserts).toHaveLength(0); + expect(jobScheduleRepositoryMock.deleteSchedulesByJobType).toHaveBeenCalledWith("data_set_lifecycle_check"); + }); + it("sets active, inactive, and tested provider gauge values after refresh", async () => { storageProviderRepositoryMock.count .mockResolvedValueOnce(10) // totalProviders @@ -1509,9 +1613,10 @@ describe("JobsService schedule rows", () => { await vi.advanceTimersByTimeAsync(35_001); await shutdownPromise; - // Defaults: deal=360, retrieval=60, dataSetCreation=300, pullCheck=300 → max=360 → +60s buffer + // Defaults: deal=360, retrieval=60, dataSetCreation=300, dataSetLifecycleCheck=600, + // pullCheck=300 → max=600 → +60s buffer expect(bossMock.stop).toHaveBeenCalledTimes(1); - expect(bossMock.stop).toHaveBeenCalledWith({ graceful: true, timeout: 420_000 }); + expect(bossMock.stop).toHaveBeenCalledWith({ graceful: true, timeout: 660_000 }); }); it("picks the longest timeout across all job types, including pullCheck under pullPiece", async () => { diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 957ce65a..9233a19f 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -5,12 +5,14 @@ import { InjectMetric } from "@willsoto/nestjs-prometheus"; import { type Job, PgBoss, type SendOptions } from "pg-boss"; import type { Counter, Gauge, Histogram } from "prom-client"; import type { Repository } from "typeorm"; +import { LIFECYCLE_CHECK_METADATA_KEY } from "../common/constants.js"; import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type JobLogContext, type ProviderJobContext, toStructuredError } from "../common/logging.js"; import { getMaintenanceWindowStatus } from "../common/maintenance-window.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; import type { IConfig, ISpBlocklistConfig } from "../config/app.config.js"; import { DataRetentionService } from "../data-retention/data-retention.service.js"; +import { DataSetLifecycleService } from "../data-set-lifecycle/data-set-lifecycle.service.js"; import type { JobType } from "../database/entities/job-schedule-state.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { DealService } from "../deal/deal.service.js"; @@ -27,11 +29,18 @@ import { } from "./job-queues.js"; import { JobScheduleRepository } from "./repositories/job-schedule.repository.js"; -type SpJobType = "deal" | "retrieval" | "data_set_creation" | "piece_cleanup" | "pull_check"; +type SpJobType = + | "deal" + | "retrieval" + | "data_set_creation" + | "data_set_lifecycle_check" + | "piece_cleanup" + | "pull_check"; const SP_JOB_TYPES: ReadonlySet = new Set([ "deal", "retrieval", "data_set_creation", + "data_set_lifecycle_check", "piece_cleanup", "pull_check", ]); @@ -99,6 +108,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private readonly storageProvidersActive: Gauge, @InjectMetric("storage_providers_tested") private readonly storageProvidersTested: Gauge, + private readonly dataSetLifecycleService: DataSetLifecycleService, ) {} /** @@ -204,6 +214,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { jobs.dealJobTimeoutSeconds, jobs.retrievalJobTimeoutSeconds, jobs.dataSetCreationJobTimeoutSeconds, + jobs.dataSetLifecycleCheckJobTimeoutSeconds, pullPiece.pullCheckJobTimeoutSeconds, ); const stopTimeoutMs = (longestJobTimeoutSec + 60) * 1000; @@ -333,6 +344,10 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.handleDataSetCreationJob(job); return; } + if (job.data.jobType === "data_set_lifecycle_check") { + await this.handleDataSetLifecycleCheckJob(job); + return; + } if (job.data.jobType === "piece_cleanup") { await this.handlePieceCleanupJob(job); return; @@ -919,6 +934,104 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } + /** + * Handles one `data_set_lifecycle_check` invocation for a provider. + * + * Creates a throwaway data set with a seed piece, then immediately calls + * `terminateService` on it — exercising the full create -> terminate lifecycle in a + * single tick. + */ + private async handleDataSetLifecycleCheckJob(job: SpJob): Promise { + const data = job.data; + const spAddress = data.spAddress; + const now = new Date(); + const maintenance = this.getMaintenanceWindowStatus(now); + if (maintenance.active) { + this.logMaintenanceSkip(`data_set_lifecycle_check job for ${spAddress}`, maintenance.window?.label, { + jobId: job.id, + providerAddress: spAddress, + providerId: this.walletSdkService.getProviderInfo(spAddress)?.id, + providerName: this.walletSdkService.getProviderInfo(spAddress)?.name, + }); + await this.deferJobForMaintenance("data_set_lifecycle_check", data, maintenance, now); + return; + } + + const jobsConfig = this.configService.get("jobs", { infer: true }); + // Defensive gate: schedules are only created when enabled, but a stale enqueued job + // (e.g. after disabling) must still no-op safely. + if (!jobsConfig.dataSetLifecycleCheckEnabled) { + this.logger.log({ + jobId: job.id, + providerAddress: spAddress, + providerId: this.walletSdkService.getProviderInfo(spAddress)?.id, + providerName: this.walletSdkService.getProviderInfo(spAddress)?.name, + event: "data_set_lifecycle_check_job_disabled", + message: "Data set lifecycle check job skipped: disabled", + enabled: jobsConfig.dataSetLifecycleCheckEnabled, + }); + return; + } + + // Fixed marker key + per-run nonce value. The key is the manual-cleanup handle; the + // nonce forces createContext to provision a fresh set each tick instead of resolving + // a prior (possibly leaked) set. Intentionally excludes base data-set metadata. + const metadata: Record = { + [LIFECYCLE_CHECK_METADATA_KEY]: Date.now().toString(), + }; + + // Create AbortController for job timeout enforcement + const abortController = new AbortController(); + const timeoutSeconds = jobsConfig.dataSetLifecycleCheckJobTimeoutSeconds; + const timeoutMs = Math.max(60000, timeoutSeconds * 1000); + const effectiveTimeoutSeconds = Math.round(timeoutMs / 1000); + const abortReason = new Error( + `Data set lifecycle check job timeout (${effectiveTimeoutSeconds}s) for ${spAddress}`, + ); + const timeoutId = setTimeout(() => { + abortController.abort(abortReason); + }, timeoutMs); + + await this.recordJobExecution("data_set_lifecycle_check", async () => { + const dataSetLogContext = await this.resolveRunnableProviderJobContext( + "data_set_lifecycle_check", + spAddress, + job.id, + "Data set lifecycle check job skipped: provider is blocked for scheduled data-storage checks", + ); + if (dataSetLogContext == null) { + clearTimeout(timeoutId); + return "success"; + } + try { + await this.dataSetLifecycleService.runLifecycleCheck(spAddress, metadata, abortController.signal); + return "success"; + } catch (error) { + if (abortController.signal.aborted) { + const reason = abortController.signal.reason; + const reasonMessage = reason instanceof Error ? reason.message : String(reason ?? ""); + this.logger.error({ + ...dataSetLogContext, + event: "data_set_lifecycle_check_job_aborted", + message: reasonMessage || "Data set lifecycle check job aborted after timeout", + timeoutSeconds: effectiveTimeoutSeconds, + error: toStructuredError(reason ?? error), + }); + return "aborted"; + } + this.logger.error({ + ...dataSetLogContext, + event: "data_set_lifecycle_check_job_failed", + message: "Data set lifecycle check job failed", + error: toStructuredError(error), + }); + throw error; + } finally { + clearTimeout(timeoutId); + } + }); + } + private maintenanceResumeAt(now: Date, maintenance: ReturnType): Date | null { if (!maintenance.active || !maintenance.window) { return null; @@ -1009,6 +1122,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dealIntervalSeconds: number; retrievalIntervalSeconds: number; dataSetCreationIntervalSeconds: number; + dataSetLifecycleCheckIntervalSeconds: number; dataRetentionPollIntervalSeconds: number; providersRefreshIntervalSeconds: number; pieceCleanupIntervalSeconds: number; @@ -1022,12 +1136,14 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const dealsPerHour = jobsConfig.dealsPerSpPerHour; const retrievalsPerHour = jobsConfig.retrievalsPerSpPerHour; const dataSetCreationsPerHour = jobsConfig.dataSetCreationsPerSpPerHour; + const dataSetLifecycleChecksPerHour = jobsConfig.dataSetLifecycleChecksPerSpPerHour; const pieceCleanupPerHour = jobsConfig.pieceCleanupPerSpPerHour; const pullChecksPerHour = pullPieceConfig.pullChecksPerSpPerHour; const dealIntervalSeconds = Math.max(1, Math.round(3600 / dealsPerHour)); const retrievalIntervalSeconds = Math.max(1, Math.round(3600 / retrievalsPerHour)); const dataSetCreationIntervalSeconds = Math.max(1, Math.round(3600 / dataSetCreationsPerHour)); + const dataSetLifecycleCheckIntervalSeconds = Math.max(1, Math.round(3600 / dataSetLifecycleChecksPerHour)); const pieceCleanupIntervalSeconds = Math.max(1, Math.round(3600 / pieceCleanupPerHour)); const pullCheckIntervalSeconds = Math.max(1, Math.round(3600 / pullChecksPerHour)); const dataRetentionPollIntervalSeconds = scheduling.dataRetentionPollIntervalSeconds; @@ -1038,6 +1154,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dealIntervalSeconds, retrievalIntervalSeconds, dataSetCreationIntervalSeconds, + dataSetLifecycleCheckIntervalSeconds, dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, @@ -1059,6 +1176,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dealIntervalSeconds, retrievalIntervalSeconds, dataSetCreationIntervalSeconds, + dataSetLifecycleCheckIntervalSeconds, dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, @@ -1078,10 +1196,13 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const dealStartAt = new Date(now.getTime() + phaseMs); const retrievalStartAt = new Date(now.getTime() + phaseMs); const dataSetCreationStartAt = new Date(now.getTime() + phaseMs); + const dataSetLifecycleCheckStartAt = new Date(now.getTime() + phaseMs); const dataRetentionPollStartAt = new Date(now.getTime() + phaseMs); const providersRefreshStartAt = new Date(now.getTime() + phaseMs); - const minDataSets = this.configService.get("blockchain").minNumDataSetsForChecks; + const minDataSets = this.configService.get("blockchain", { infer: true }).minNumDataSetsForChecks; + // Lifecycle check schedules are only created when enabled explicitly + const lifecycleCheckScheduleEnabled = this.configService.get("jobs", { infer: true }).dataSetLifecycleCheckEnabled; const cleanupStartAt = new Date(now.getTime() + phaseMs); const pullCheckStartAt = new Date(now.getTime() + phaseMs); @@ -1109,6 +1230,14 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataSetCreationStartAt, ); } + if (lifecycleCheckScheduleEnabled) { + await this.jobScheduleRepository.upsertSchedule( + "data_set_lifecycle_check", + address, + dataSetLifecycleCheckIntervalSeconds, + dataSetLifecycleCheckStartAt, + ); + } await this.jobScheduleRepository.upsertSchedule( "piece_cleanup", address, @@ -1140,6 +1269,19 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } + // When the lifecycle check is disabled, remove any stale data_set_lifecycle_check + // schedules so they stop enqueuing no-op jobs. + if (!lifecycleCheckScheduleEnabled) { + const removed = await this.jobScheduleRepository.deleteSchedulesByJobType("data_set_lifecycle_check"); + if (removed > 0) { + this.logger.warn({ + event: "data_set_lifecycle_check_schedules_removed", + message: "Removed data_set_lifecycle_check schedules because the job is disabled", + removed, + }); + } + } + // Global job schedules (sp_address = '') await this.jobScheduleRepository.upsertSchedule( "data_retention_poll", @@ -1251,6 +1393,8 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { return SP_WORK_QUEUE; case "data_set_creation": return SP_WORK_QUEUE; + case "data_set_lifecycle_check": + return SP_WORK_QUEUE; case "piece_cleanup": return SP_WORK_QUEUE; case "pull_check": @@ -1273,6 +1417,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { row.job_type === "deal" || row.job_type === "retrieval" || row.job_type === "data_set_creation" || + row.job_type === "data_set_lifecycle_check" || row.job_type === "piece_cleanup" || row.job_type === "pull_check" ) { @@ -1346,6 +1491,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { "deal", "retrieval", "data_set_creation", + "data_set_lifecycle_check", "piece_cleanup", "pull_check", "data_retention_poll", diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index 6411a3b1..a2dda2f1 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -71,7 +71,7 @@ export class JobScheduleRepository { const [rows] = (await this.dataSource.query( ` DELETE FROM job_schedule_state - WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') + WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'data_set_lifecycle_check', 'piece_cleanup', 'pull_check') AND sp_address <> '' RETURNING sp_address `, @@ -82,7 +82,7 @@ export class JobScheduleRepository { const [rows] = (await this.dataSource.query( ` DELETE FROM job_schedule_state - WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') + WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'data_set_lifecycle_check', 'piece_cleanup', 'pull_check') AND sp_address <> '' AND sp_address <> ALL($1::text[]) RETURNING sp_address @@ -100,6 +100,30 @@ export class JobScheduleRepository { } } + /** + * Deletes all per-provider schedule rows for a given job type. + * + * Used to stop a job entirely when it is disabled by config (for example the + * `data_set_lifecycle_check` canary when `DATASET_LIFECYCLE_CHECK_ENABLED=false`), + * so stale schedules do not keep enqueuing no-op jobs. + * + * @param jobType - The job type whose per-provider schedules should be removed. + * @returns Number of schedule rows deleted. + */ + async deleteSchedulesByJobType(jobType: JobType): Promise { + const result = await this.dataSource.query( + ` + DELETE FROM job_schedule_state + WHERE job_type = $1 + AND sp_address <> '' + `, + [jobType], + ); + // node-postgres returns [rows, rowCount] for DELETE without RETURNING. + const rowCount = Array.isArray(result) ? result[1] : undefined; + return typeof rowCount === "number" ? rowCount : 0; + } + /** * Counts manually paused jobs by type. */ diff --git a/apps/backend/src/metrics-prometheus/check-metric-labels.ts b/apps/backend/src/metrics-prometheus/check-metric-labels.ts index 07415d45..5c02eff6 100644 --- a/apps/backend/src/metrics-prometheus/check-metric-labels.ts +++ b/apps/backend/src/metrics-prometheus/check-metric-labels.ts @@ -1,4 +1,10 @@ -export type CheckType = "dataStorage" | "retrieval" | "dataRetention" | "dataSetCreation" | "pullCheck"; +export type CheckType = + | "dataStorage" + | "retrieval" + | "dataRetention" + | "dataSetCreation" + | "dataSetLifecycleCheck" + | "pullCheck"; export type ProviderStatus = "approved" | "unapproved"; export type CheckMetricLabels = { diff --git a/apps/backend/src/metrics-prometheus/check-metrics.service.ts b/apps/backend/src/metrics-prometheus/check-metrics.service.ts index 7afd9935..39f697f2 100644 --- a/apps/backend/src/metrics-prometheus/check-metrics.service.ts +++ b/apps/backend/src/metrics-prometheus/check-metrics.service.ts @@ -285,6 +285,34 @@ export class DataSetCreationCheckMetrics { } } +@Injectable() +export class DataSetLifecycleCheckMetrics { + constructor( + @InjectMetric("dataSetLifecycleCheckMs") + private readonly dataSetLifecycleCheckMs: Histogram, + @InjectMetric("dataSetLifecycleCheckStatus") + private readonly dataSetLifecycleCheckStatusCounter: Counter, + ) {} + + /** + * Observe the end-to-end duration of one lifecycle check (create throwaway data set + * with a seed piece, then `terminateService` and confirm `pdpEndEpoch != 0`). + * Emitted on `success` and `failure.timedout` only (analogous to `dataSetCreationMs`). + */ + observeCheckDuration(labels: CheckMetricLabels, value: number | null | undefined): void { + observePositive(this.dataSetLifecycleCheckMs, labels, value); + } + + /** + * Record data-set lifecycle check status. + * Values: `success`, `failure.timedout`, `failure.other`. + * See docs/checks/data-set-lifecycle-check.md. + */ + recordStatus(labels: CheckMetricLabels, value: string): void { + this.dataSetLifecycleCheckStatusCounter.inc({ ...labels, value }); + } +} + @Injectable() export class PullCheckCheckMetrics { constructor( diff --git a/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts b/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts index a27e945a..5fe80fc7 100644 --- a/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts +++ b/apps/backend/src/metrics-prometheus/metrics-prometheus.module.ts @@ -9,6 +9,7 @@ import { import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js"; import { DataSetCreationCheckMetrics, + DataSetLifecycleCheckMetrics, DataStorageCheckMetrics, DiscoverabilityCheckMetrics, PullCheckCheckMetrics, @@ -154,6 +155,13 @@ const metricProviders = [ labelNames: ["checkType", "providerId", "providerName", "providerStatus"] as const, buckets: [100, 500, 1000, 2000, 5000, 10000, 30000, 60000, 120000, 300000, 600000], }), + makeHistogramProvider({ + // docs/checks/events-and-metrics.md#dataSetLifecycleCheckMs + name: "dataSetLifecycleCheckMs", + help: "End-to-end data-set lifecycle check duration: create with seed piece then terminate and confirm pdpEndEpoch != 0 (ms)", + labelNames: ["checkType", "providerId", "providerName", "providerStatus"] as const, + buckets: [100, 500, 1000, 2000, 5000, 10000, 30000, 60000, 120000, 300000, 600000], + }), // Sub-status metrics (docs/checks/data-storage.md) makeCounterProvider({ // docs/checks/data-storage.md#sub-status-meanings (Upload Status) @@ -203,6 +211,12 @@ const metricProviders = [ help: "Data-set creation status counts", labelNames: ["checkType", "providerId", "providerName", "providerStatus", "value"] as const, }), + makeCounterProvider({ + // docs/checks/events-and-metrics.md#dataSetLifecycleCheckStatus + name: "dataSetLifecycleCheckStatus", + help: "Data-set lifecycle check status counts (success | failure.timedout | failure.other)", + labelNames: ["checkType", "providerId", "providerName", "providerStatus", "value"] as const, + }), // Pull check metrics (docs/checks/pull-check.md) makeHistogramProvider({ name: "pullRequestAcknowledgementLatencyMs", @@ -375,6 +389,7 @@ const metricProviders = [ RetrievalCheckMetrics, DiscoverabilityCheckMetrics, DataSetCreationCheckMetrics, + DataSetLifecycleCheckMetrics, PullCheckCheckMetrics, WalletBalanceCollector, // HTTP metrics interceptor @@ -390,6 +405,7 @@ const metricProviders = [ RetrievalCheckMetrics, DiscoverabilityCheckMetrics, DataSetCreationCheckMetrics, + DataSetLifecycleCheckMetrics, PullCheckCheckMetrics, WalletBalanceCollector, ], diff --git a/docs/checks/README.md b/docs/checks/README.md index 903e543c..b083afc7 100644 --- a/docs/checks/README.md +++ b/docs/checks/README.md @@ -6,6 +6,7 @@ The files are: - [retrievals.md](./retrievals.md): Defines the "retrieval check" and how it is calculated. - [data-retention.md](./data-retention.md): Defines the "data retention check" and how it is calculated. - [pull-check.md](./pull-check.md): Defines the "pull check" and how it is calculated. +- [data-set-lifecycle-check.md](./data-set-lifecycle-check.md): Defines the `data_set_lifecycle_check` canary that creates and terminates a throwaway data set each tick. - [events-and-metrics.md](./events-and-metrics.md): Defines the events and metrics that are used to assess SP performance. diff --git a/docs/checks/data-set-lifecycle-check.md b/docs/checks/data-set-lifecycle-check.md new file mode 100644 index 00000000..9e559f5a --- /dev/null +++ b/docs/checks/data-set-lifecycle-check.md @@ -0,0 +1,128 @@ +# Data Set Lifecycle Check + +This document is the **source of truth** for how dealbot's Data Set Lifecycle check works. + +Source code links throughout this document point to the current implementation. + +For event and metric definitions used by the dashboard, see [Dealbot Events & Metrics](./events-and-metrics.md). + +> **Note**: This check calls `terminateService` to start the on-chain termination sequence. It does **not** call `PDPVerifier.deleteDataSet`, which is SP-initiated. See the [FAQ](#what-happens-on-chain-after-terminateservice-is-called) for details on what happens after termination. + +## Overview + +A "data set lifecycle check" tests the full `createDataSet → terminateService` lifecycle for a storage provider. Dealbot creates an empty throwaway data set and immediately terminates it in the same run. A successful check confirms both the `createDataSet` and `terminateService` paths work correctly on the SP. + +Every data set lifecycle check, dealbot: + +1. Creates a new empty data set, tagged with a `dealbotLifecycleCheck` metadata key so any leaked sets are discoverable later +2. Waits for the SP to confirm the data set is created on-chain and returns a `dataSetId` +3. Calls `terminateService` on the created data set and waits for the transaction receipt + +A successful check requires all [assertions in the table below](#what-gets-asserted) to pass. Failure occurs if any step fails or the check exceeds its max allowed time. + +## What Gets Asserted + +Each data set lifecycle check asserts the following for every SP: + +| # | Assertion | How It's Checked | Relevant Metric | +|---|-----------|-----------------|-----------------| +| 1 | SP accepts an empty data set creation | `createDataSet` call completes and the SP returns a `statusUrl` | [`dataSetLifecycleCheckStatus`](./events-and-metrics.md#dataSetLifecycleCheckStatus) | +| 2 | Data set is confirmed on-chain | `waitForCreateDataSet` resolves with a `dataSetId` | [`dataSetLifecycleCheckStatus`](./events-and-metrics.md#dataSetLifecycleCheckStatus) | +| 3 | `terminateService` succeeds on the created data set | `terminateServiceSync` call completes and the transaction receipt is received | [`dataSetLifecycleCheckMs`](./events-and-metrics.md#dataSetLifecycleCheckMs) | +| 4 | All steps complete within the timeout | Check is not marked successful until all steps pass within `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS` | [`dataSetLifecycleCheckMs`](./events-and-metrics.md#dataSetLifecycleCheckMs) | + +## Data Set Lifecycle Check Lifecycle + +The dealbot scheduler triggers data set lifecycle check jobs at a configurable rate. + +```mermaid +flowchart TD + CreateDataSet["createDataSet (empty data set)"] --> Wait["waitForCreateDataSet"] + Wait -->|dataSetId confirmed| Terminate["terminateServiceSync"] + Terminate -->|tx receipt received| Success["Mark check successful"] + Terminate -->|error| Fail["Mark check failed"] + Wait -->|error| Fail + CreateDataSet -->|error| Fail + CreateDataSet -->|abort signal| Fail +``` + +### 1. Apply job guards + +Dealbot applies the same maintenance-window and SP-blocklist rules used by all other SP jobs. If `DATASET_LIFECYCLE_CHECK_ENABLED` is `false`, the job logs a disabled skip and exits. + +### 2. Create the empty data set + +Dealbot calls `createDataSet` (from `@filoz/synapse-core/sp`) to create a new empty data set on the SP. The data set is tagged with metadata `{ dealbotLifecycleCheck: "" }`. The fixed `dealbotLifecycleCheck` key is the handle for finding leaked sets later; the per-run value ensures a fresh data set is created on every invocation rather than resolving a prior one. + +This step does **not** emit `dataSetCreation` metrics — those belong to the `data_set_creation` job. + +Source: [`data-set-lifecycle.service.ts` (`runLifecycleCheck`)](../../apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts) + +### 3. Wait for creation confirmation + +Dealbot calls `waitForCreateDataSet` with the `statusUrl` returned by the SP. When the SP confirms the data set is created on-chain, it resolves with a `dataSetId`. + +### 4. Terminate the service + +Dealbot calls `terminateServiceSync` (from `@filoz/synapse-core/warm-storage`) on the newly created `dataSetId`. This submits the terminate transaction and waits for the receipt, confirming the termination was recorded on-chain. This is Step 1 of the [full on-chain termination sequence](#what-happens-on-chain-after-terminateservice-is-called). The job does not wait for the full ~30-day rail finalization. + +The entire check (creation + confirmation + termination) is bounded by `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS`. A timeout is classified as `failure.timedout`. + +## Check Status Progression + +A data set lifecycle check has a single terminal status, recorded once per check via [`dataSetLifecycleCheckStatus`](./events-and-metrics.md#dataSetLifecycleCheckStatus): + +| Overall Status | Meaning | +|--------|---------| +| `success` | All steps passed: data set created, service terminated, and termination confirmed on-chain. | +| `failure.timedout` | The job was aborted because it exceeded `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS`. | +| `failure.other` | Any other failure: `createDataSet` failed, `terminateService` failed, or on-chain confirmation polling failed. | + +## Metrics Recorded + +Metric definitions live in [Dealbot Events & Metrics](./events-and-metrics.md). The metrics emitted by a data set lifecycle check are: + +- [`dataSetLifecycleCheckStatus`](./events-and-metrics.md#dataSetLifecycleCheckStatus) — `success`, `failure.timedout`, or `failure.other` per provider per run +- [`dataSetLifecycleCheckMs`](./events-and-metrics.md#dataSetLifecycleCheckMs) — end-to-end duration (create + confirm + terminate); emitted on `success` and `failure.timedout` + +## Configuration + +Key environment variables that control data set lifecycle check behavior: + +| Variable | Description | +|----------|-------------| +| `DATASET_LIFECYCLE_CHECK_ENABLED` | Enables or disables the check. Defaults to `true` on calibration, `false` on mainnet. When disabled, stale schedules are removed so they stop enqueuing no-op jobs. | +| `DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR` | Per-SP check rate. Independent of `DATASET_CREATIONS_PER_SP_PER_HOUR`. | +| `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS` | Max end-to-end job runtime before forced abort. Default `600`. | + +Source: [`apps/backend/src/config/app.config.ts`](../../apps/backend/src/config/app.config.ts) + +See also: [`docs/environment-variables.md`](../environment-variables.md) for the source-of-truth configuration reference. + +## FAQ + +### What happens on-chain after `terminateService` is called? + +`terminateService` does not delete a data set instantly. It starts a multi-step on-chain sequence that plays out over roughly 30 days. The lifecycle check only waits for the first step before it exits. + +**Step 1 — terminateService confirms.** `terminateService` calls `FilecoinPay.terminateRail(pdpRailId)`, which sets `endEpoch = block.number + lockupPeriod` on the PDP rail. The FWSS `railTerminated` callback fires in the same transaction, stores `info.pdpEndEpoch`, and emits `PDPPaymentsTerminated` and `ServiceTerminated`. This is the point dealbot polls for: `pdpEndEpoch != 0`. + +**Step 2 — rail finalization (~30 days later).** When the PDP rail's `settledUpTo` reaches `endEpoch`, `finalizeTerminatedRail` fires atomically inside the settle transaction. + +**Step 3 — data set deletion at PDPVerifier (SP-initiated).** After the rail finalizes, the SP may call `PDPVerifier.deleteDataSet`. The lifecycle check does not wait for steps 2 or 3 — waiting ~30 days per invocation would defeat the purpose of a canary. + +### Why does data set creation use an empty data set? + +Empty data set creation calls `createDataSet` from `@filoz/synapse-core/sp` directly, bypassing the upload flow used by the data storage check. This keeps the lifecycle check lightweight: it validates the SP's `createDataSet → terminateService` path without storing any actual data or consuming upload capacity. + +### What if creation succeeds but termination fails? + +If creation succeeds but termination fails (process crash, job timeout, or an on-chain error that is not an already-terminated no-op), the created data set stays live on the SP. This is called a leak and is an accepted trade-off for keeping the job self-contained. + +Leaked sets are discoverable by filtering data sets with the `dealbotLifecycleCheck` metadata key. Each leak is also recorded in the `dataset_lifecycle_check_failed` log line (message: "throwaway data set may have leaked") with the `dataSetId` included for easy identification. + +### Why does the job create and terminate in the same run? + +An earlier design terminated an existing managed slot and relied on `data_set_creation` to recreate it on a later tick. That approach was coupled to `MIN_NUM_DATASETS_FOR_CHECKS`, a minimum-index window, and the creation job's schedule — making the canary sensitive to overall provider state. + +The current design is self-contained: it always creates a fresh data set and terminates it in the same run. The check works regardless of provider state and needs no coordination with other jobs. diff --git a/docs/checks/events-and-metrics.md b/docs/checks/events-and-metrics.md index 2d4a2b29..c0694eee 100644 --- a/docs/checks/events-and-metrics.md +++ b/docs/checks/events-and-metrics.md @@ -100,7 +100,7 @@ sequenceDiagram * They are exported via Prometheus. * All Prometheus/OpenTelemetry metrics have label/attributes for: - `network=calibration|mainnet` - - `checkType=dataStorage|retrieval|dataRetention|dataSetCreation|pullCheck` — attribute metrics to a particular check/job + - `checkType=dataStorage|retrieval|dataRetention|dataSetCreation|dataSetLifecycleCheck|pullCheck` — attribute metrics to a particular check/job - `providerId` — attribute metrics to a particular SP - `providerName` — human-readable name of the SP (defaults to `"unknown"` when not available) - `providerStatus=approved|unapproved` — attribute metrics to only approved SPs for example @@ -126,6 +126,7 @@ sequenceDiagram | `dataStorageCheckMs` | Data Storage | [`uploadToSpStart`](#uploadToSpStart) | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | Duration of a Data Storage check | | | `retrievalCheckMs` | Retrieval | Retrieval check start | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | Duration of a Retrieval check | | | `dataSetCreationMs` | Data-Set Creation | Data-set creation uploadToSpStart | Data-set creation pieceConfirmed | Duration of one data-set creation with confirmed piece (all using `createDataSetWithPiece`) | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | +| `dataSetLifecycleCheckMs` | Data-Set Lifecycle Check | Empty data set creation start | `terminateServiceSync` tx receipt received | End-to-end duration of one lifecycle check: create an empty throwaway data set then terminate it (`runLifecycleCheck`). Emitted on `success` and `failure.timedout` only. See [data-set-lifecycle-check.md](./data-set-lifecycle-check.md). | [`data-set-lifecycle.service.ts`](../../apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts) | | `pullRequestAcknowledgementLatencyMs` | Pull | [`pullRequestSubmittedToSp`](#pullRequestSubmittedToSp) | [`pullRequestAcknowledgedBySp`](#pullRequestAcknowledgedBySp) | Time from `pullPieces` submission to SP request acknowledgement. | [`pull-check.service.ts`](../../apps/backend/src/pull-check/pull-check.service.ts) | | `pullRequestStartedMs` | Pull | [`pullRequestSubmittedToSp`](#pullRequestSubmittedToSp) | [`pullRequestStartedBySp`](#pullRequestStartedBySp) | Time from `pullPieces` submission to the SP reading the first byte of `/api/piece/{pieceCid}`. Skipped (no observation) when the SP never fetches from dealbot. | [`pull-check.service.ts`](../../apps/backend/src/pull-check/pull-check.service.ts), [`pull-piece.controller.ts`](../../apps/backend/src/pull-check/pull-piece.controller.ts) | | `pullRequestCompletionLatencyMs` | Pull | [`pullRequestSubmittedToSp`](#pullRequestSubmittedToSp) | [`pullRequestIsTerminal`](#pullRequestIsTerminal) | Time from `pullPieces` submission to terminal SP pull status. Emitted once for the check, either on success or failure. | [`pull-check.service.ts`](../../apps/backend/src/pull-check/pull-check.service.ts) | @@ -150,6 +151,7 @@ sequenceDiagram | `ipfsRetrievalHttpResponseCode` | Data Storage, Retrieval | [`ipfsRetrievalLastByteReceived`](#ipfsRetrievalLastByteReceived) | `200`, `500`, `2xxSuccess`, `4xxClientError`, `5xxServerError`, `otherHttpStatusCodes`, `failure` | | 1 | [`retrieval.service.ts`](../../apps/backend/src/retrieval/retrieval.service.ts) | | `retrievalStatus` | Data Storage, Retrieval | [`ipfsRetrievalIntegrityChecked`](#ipfsRetrievalIntegrityChecked) | `success`, `failure.timedout`, `failure.other` from [Data Storage Sub-status meanings](./data-storage.md#sub-status-meanings). | On the Retrieval path, the pre-flight branches on the on-chain `PDPVerifier.pieceLive(dataSetId, pieceId)` result. When `pieceLive=false` (dataset terminated, piece never created, or piece hard-removed), `skipped.piece_missing` is emitted and the deal is marked `cleaned_up=true`; no SP probe runs. When `pieceLive=true` and the SP returns 404 on `/pdp/piece/:pieceCid/status`, `failure.other` is emitted and a failed retrieval row is recorded (deal stays in the candidate pool for re-probing). | 1 | | | `dataSetCreationStatus` | Data-Set Creation | Not tied to an [event above](#event-list) but rather to data-set creation start (`pending`) and completion (`success`/`failure.*`) | `pending`, `success`, `failure.timedout`, `failure.other` | | 1 | [`deal.service.ts`](../../apps/backend/src/deal/deal.service.ts) | +| `dataSetLifecycleCheckStatus` | Data-Set Lifecycle Check | When a `data_set_lifecycle_check` invocation finishes (create + terminate) | `success`, `failure.timedout`, `failure.other` | `success` confirms the full create→terminate lifecycle completed (`terminateServiceSync` tx receipt received). Persistent `failure.*` indicates a `createDataSet` or `terminateService` regression. See [data-set-lifecycle-check.md](./data-set-lifecycle-check.md). | 1 | [`data-set-lifecycle.service.ts`](../../apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts) | | `dataSetChallengeStatus` | Data Retention | Emitted on each [Data Retention Check](./data-retention.md) poll when a provider's confirmed proving-period totals advance (strictly positive deltas since the last poll). | `success` (challenges in newly confirmed successful proving periods), `failure` (challenges in newly confirmed faulted periods) | | Counter increment = **period delta × 5** (`CHALLENGES_PER_PROVING_PERIOD`). Period delta is the increase in subgraph-confirmed proving periods since the previous poll for that provider (not "challenges per poll" in the abstract). See [data-retention.md §3](./data-retention.md#3-calculate-deltas). | [`data-retention.service.ts`](../../apps/backend/src/data-retention/data-retention.service.ts) | | `pullRequestProviderStatus` | Pull | When the SP reports a terminal pull status via `waitForPullPieces`. Recorded exactly once per check (intermediate poll statuses are not counted). | Raw SP-reported pull status, for example `complete`, `failed`, `not_found`. Use this to separate SP-side pull failures from dealbot-side validation failures. | | 1 | [`pull-check.service.ts`](../../apps/backend/src/pull-check/pull-check.service.ts) | | `pullCheckStatus` | Pull | When the [Pull Check](./pull-check.md) terminates (success after direct piece validation, or any failure). Recorded exactly once per check. | `success`, `failure.timedout`, `failure.other` from [Pull Check Status](./pull-check.md#pull-check-status). | | 1 | [`pull-check.service.ts`](../../apps/backend/src/pull-check/pull-check.service.ts) | diff --git a/docs/environment-variables.md b/docs/environment-variables.md index a5432879..ecf5e6a9 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -11,7 +11,7 @@ This document provides a comprehensive guide to all environment variables used b | [Blockchain](#blockchain-configuration) | `NETWORK`, `RPC_URL`, `WALLET_ADDRESS`, `WALLET_PRIVATE_KEY`, `SESSION_KEY_PRIVATE_KEY`, `CHECK_DATASET_CREATION_FEES`, `USE_ONLY_APPROVED_PROVIDERS`, `PDP_SUBGRAPH_ENDPOINT` | | [Dataset Versioning](#dataset-versioning) | `DEALBOT_DATASET_VERSION` | | [Scheduling](#scheduling-configuration) | `PROVIDERS_REFRESH_INTERVAL_SECONDS`, `DATA_RETENTION_POLL_INTERVAL_SECONDS`, `DEALBOT_MAINTENANCE_WINDOWS_UTC`, `DEALBOT_MAINTENANCE_WINDOW_MINUTES` | -| [Jobs (pg-boss)](#jobs-pg-boss) | `DEALBOT_PGBOSS_SCHEDULER_ENABLED`, `DEALBOT_PGBOSS_POOL_MAX`, `DEALS_PER_SP_PER_HOUR`, `MIN_NUM_DATASETS_FOR_CHECKS`, `DATASET_CREATIONS_PER_SP_PER_HOUR`, `RETRIEVALS_PER_SP_PER_HOUR`, `JOB_SCHEDULER_POLL_SECONDS`, `JOB_WORKER_POLL_SECONDS`, `PG_BOSS_LOCAL_CONCURRENCY`, `JOB_CATCHUP_MAX_ENQUEUE`, `JOB_SCHEDULE_PHASE_SECONDS`, `JOB_ENQUEUE_JITTER_SECONDS`, `DATA_SET_CREATION_JOB_TIMEOUT_SECONDS`, `DEAL_JOB_TIMEOUT_SECONDS`, `RETRIEVAL_JOB_TIMEOUT_SECONDS`, `SHUTDOWN_FINAL_SCRAPE_DELAY_SECONDS`, `IPFS_BLOCK_FETCH_CONCURRENCY` | +| [Jobs (pg-boss)](#jobs-pg-boss) | `DEALBOT_PGBOSS_SCHEDULER_ENABLED`, `DEALBOT_PGBOSS_POOL_MAX`, `DEALS_PER_SP_PER_HOUR`, `MIN_NUM_DATASETS_FOR_CHECKS`, `DATASET_CREATIONS_PER_SP_PER_HOUR`, `DATASET_LIFECYCLE_CHECK_ENABLED`, `DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR`, `RETRIEVALS_PER_SP_PER_HOUR`, `JOB_SCHEDULER_POLL_SECONDS`, `JOB_WORKER_POLL_SECONDS`, `PG_BOSS_LOCAL_CONCURRENCY`, `JOB_CATCHUP_MAX_ENQUEUE`, `JOB_SCHEDULE_PHASE_SECONDS`, `JOB_ENQUEUE_JITTER_SECONDS`, `DATA_SET_CREATION_JOB_TIMEOUT_SECONDS`, `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS`, `DEAL_JOB_TIMEOUT_SECONDS`, `RETRIEVAL_JOB_TIMEOUT_SECONDS`, `SHUTDOWN_FINAL_SCRAPE_DELAY_SECONDS`, `IPFS_BLOCK_FETCH_CONCURRENCY` | | [Dataset](#dataset-configuration) | `DEALBOT_LOCAL_DATASETS_PATH`, `RANDOM_PIECE_SIZES` | | [ClickHouse](#clickhouse-configuration) | `CLICKHOUSE_URL`, `CLICKHOUSE_BATCH_SIZE`, `CLICKHOUSE_FLUSH_INTERVAL_MS`, `DEALBOT_PROBE_LOCATION` | | [Timeouts](#timeout-configuration) | `CONNECT_TIMEOUT_MS`, `HTTP_REQUEST_TIMEOUT_MS`, `HTTP2_REQUEST_TIMEOUT_MS`, `IPNI_VERIFICATION_TIMEOUT_MS`, `IPNI_VERIFICATION_POLLING_MS` | @@ -676,6 +676,34 @@ rate-based (per hour) and persisted in Postgres so restarts do not reset timing. --- +### `DATASET_LIFECYCLE_CHECK_ENABLED` + +- **Type**: `boolean` +- **Required**: No +- **Default**: `true` on calibration, `false` on mainnet + +**Role**: Enables the `data_set_lifecycle_check` canary job, which in a single tick creates an empty throwaway data set and immediately terminates it (`terminateServiceSync`), continuously exercising the on-chain `createDataSet → terminateService` lifecycle. + +**Notes**: Self-contained — it does not touch the managed check data sets and does not depend on `data_set_creation`. When disabled, stale schedules are removed so they stop enqueuing no-op jobs. + +**See also**: [`docs/checks/data-set-lifecycle-check.md`](./checks/data-set-lifecycle-check.md) + +--- + +### `DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR` + +- **Type**: `number` +- **Required**: No +- **Default**: `1` + +**Role**: Target lifecycle check rate per storage provider for the `data_set_lifecycle_check` canary. Each run creates and terminates one throwaway data set. + +**Limits**: Config schema caps this at 20. + +**Notes**: Independent of `DATASET_CREATIONS_PER_SP_PER_HOUR`. Fractional values are supported. + +--- + ### `JOB_SCHEDULER_POLL_SECONDS` - **Type**: `number` @@ -810,6 +838,27 @@ Use this to stagger multiple dealbot deployments that are not sharing a database --- +### `DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS` + +- **Type**: `number` +- **Required**: No +- **Default**: `600` (10 minutes) +- **Minimum**: `60` (1 minute) +- **Enforced**: Yes (config validation, effective floor applied at runtime) + +**Role**: Maximum runtime for `data_set_lifecycle_check` jobs before forced abort via `AbortController`. Bounds the empty data set creation (`createDataSet` + `waitForCreateDataSet`) and the `terminateServiceSync` call. + +**When to update**: + +- Increase if create-plus-terminate consistently times out on slow networks. +- Decrease for faster fail-fast behavior during testing. + +**Note**: If the configured value is below 60 seconds, the runtime silently raises it to 60 seconds as an effective floor. An abort due to this timeout (or an internal poll timeout) is recorded as `dataSetLifecycleCheckStatus{value="failure.timedout"}` and retried on the next scheduled tick. + +**See also**: [`docs/checks/data-set-lifecycle-check.md`](./checks/data-set-lifecycle-check.md) + +--- + ### `DEAL_JOB_TIMEOUT_SECONDS` - **Type**: `number` diff --git a/docs/jobs.md b/docs/jobs.md index e1e778fc..b29fc6b4 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -15,7 +15,7 @@ This doc explains what a "job" is in dealbot, how jobs are defined, how they're | --- | --- | --- | | `job_schedule_state` | One per `` plus global rows | Schedule state owned by dealbot. | | Storage provider (SP) | One per SP in registry | Filtered by `USE_ONLY_APPROVED_PROVIDERS` when enabled. | -| Job type | `deal`, `retrieval`, `data_set_creation`, `piece_cleanup`, `pull_check`, `providers_refresh`, `data_retention_poll` | `deal` corresponds to "data storage check" externally; we keep `deal` in code/DB for compatibility. | +| Job type | `deal`, `retrieval`, `data_set_creation`, `data_set_lifecycle_check`, `piece_cleanup`, `pull_check`, `providers_refresh`, `data_retention_poll` | `deal` corresponds to "data storage check" externally; we keep `deal` in code/DB for compatibility. | | pg-boss queue | `sp.work`, `providers.refresh`, `data.retention.poll` | `sp.work` is a singleton queue. | | Dealbot scheduler | One per process (when enabled) | Runs the scheduling loop. | | Dealbot worker process | One Node.js process with `DEALBOT_RUN_MODE=worker` or `both` | Hosts pg-boss workers. | @@ -37,6 +37,7 @@ This doc explains what a "job" is in dealbot, how jobs are defined, how they're | `piece_cleanup` | `sp.work` | [`JobsService.handlePieceCleanupJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'piece_cleanup', spAddress, intervalSeconds }` | — | | `pull_check` | `sp.work` | [`JobsService.handlePullCheckJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'pull_check', spAddress, intervalSeconds }` | [pull check](./checks/pull-check.md) | | `data_set_creation` | `sp.work` | [`JobsService.handleDataSetCreationJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'data_set_creation', spAddress, intervalSeconds }` | [data-set-creation](./data-set-creation.md) | +| `data_set_lifecycle_check` | `sp.work` | [`JobsService.handleDataSetLifecycleCheckJob`](../apps/backend/src/jobs/jobs.service.ts) | `{ jobType: 'data_set_lifecycle_check', spAddress, intervalSeconds }` | [data-set-lifecycle-check](./checks/data-set-lifecycle-check.md) | `sp.work` is created with `policy=singleton`, and jobs set `singletonKey=spAddress` so only one active job per SP can run at a time.