From e4a30651ec884d68f40be68df934b64ff1b283d2 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Tue, 19 May 2026 10:29:33 -0400 Subject: [PATCH] feat(jobs): add dataset_cleanup_sweep global job Scheduled global job that flips Deal.cleaned_up=true for any uncleaned Deal row whose data_set_id shows pdpEndEpoch != 0n on FWSS (terminated) or whose getDataSet returns null (removed). Closes the gap from the operator-must-terminate workflow tracked in https://github.com/FilOzone/dealbot/issues/546: after an operator runs a Safe terminateService batch (like #545), synapse-sdk's createContext filters out the now-terminated dataset, so getDataSetProvisioningStatus returns "missing" instead of "terminated" and repairTerminatedDataSet is never invoked for those rows. Empirically, 87% of recent failed retrievals tie to terminated datasets that should have been auto-cleaned. The sweeper eliminates this noise. Cadence default 24h (DATASET_CLEANUP_SWEEP_INTERVAL_SECONDS=86400), batch size 50 (DATASET_CLEANUP_SWEEP_BATCH_SIZE=50). Reuses recordJobExecution for jobs_* metrics. Idempotent UPDATE filters cleaned_up=false. Adds a partial index on deals(data_set_id) WHERE cleaned_up=false AND data_set_id IS NOT NULL to keep the SELECT DISTINCT cheap as the deals table grows. Extracts the existing UPDATE block from repairTerminatedDataSet into DealService.markDealsCleanedUpForDataSets and shares it with the sweeper to avoid divergence. Tracking: https://github.com/FilOzone/dealbot/issues/546 --- apps/backend/src/config/app.config.ts | 7 + .../entities/job-schedule-state.entity.ts | 3 +- ...200000000-AddDealsUncleanedDatasetIndex.ts | 25 ++++ apps/backend/src/deal/deal.service.spec.ts | 2 +- apps/backend/src/deal/deal.service.ts | 121 +++++++++++++++++- .../src/jobs/dataset-cleanup-sweep.handler.ts | 57 +++++++++ apps/backend/src/jobs/job-queues.ts | 1 + apps/backend/src/jobs/jobs.service.ts | 59 ++++++++- 8 files changed, 262 insertions(+), 13 deletions(-) create mode 100644 apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts create mode 100644 apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index b3b32a37..51fed825 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -179,6 +179,8 @@ export interface IBlockchainConfig { export interface ISchedulingConfig { providersRefreshIntervalSeconds: number; dataRetentionPollIntervalSeconds: number; + datasetCleanupSweepIntervalSeconds: number; + datasetCleanupSweepBatchSize: number; maintenanceWindowsUtc: string[]; maintenanceWindowMinutes: number; } @@ -383,6 +385,11 @@ export function loadConfig(): IConfig { scheduling: { providersRefreshIntervalSeconds: Number.parseInt(process.env.PROVIDERS_REFRESH_INTERVAL_SECONDS || "14400", 10), dataRetentionPollIntervalSeconds: Number.parseInt(process.env.DATA_RETENTION_POLL_INTERVAL_SECONDS || "3600", 10), + datasetCleanupSweepIntervalSeconds: Number.parseInt( + process.env.DATASET_CLEANUP_SWEEP_INTERVAL_SECONDS || "86400", + 10, + ), + datasetCleanupSweepBatchSize: Number.parseInt(process.env.DATASET_CLEANUP_SWEEP_BATCH_SIZE || "50", 10), maintenanceWindowsUtc: (process.env.DEALBOT_MAINTENANCE_WINDOWS_UTC || "07:00,22:00") .split(",") .map((value) => value.trim()) diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index d1758ae9..7821a0f3 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -11,7 +11,8 @@ export type JobType = | "metrics_cleanup" // legacy: no longer scheduled; see RemoveMetricsJobScheduleRows migration. TODO(#457): remove. | "providers_refresh" | "data_retention_poll" - | "piece_cleanup"; + | "piece_cleanup" + | "dataset_cleanup_sweep"; @Entity("job_schedule_state") @Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true }) diff --git a/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts b/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts new file mode 100644 index 00000000..c2d53e10 --- /dev/null +++ b/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts @@ -0,0 +1,25 @@ +import type { MigrationInterface, QueryRunner } from "typeorm"; + +/** + * Partial index supporting the dataset_cleanup_sweep job and the + * piece-cleanup / retrieval candidate selectors that filter on + * `cleaned_up = false`. + * + * Without this index, `SELECT DISTINCT data_set_id WHERE cleaned_up = false` + * triggers a full table scan of `deals` on every sweep tick. + */ +export class AddDealsUncleanedDatasetIndex1779200000000 implements MigrationInterface { + name = "AddDealsUncleanedDatasetIndex1779200000000"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "idx_deals_unclean_dataset" + ON deals (data_set_id) + WHERE cleaned_up = false AND data_set_id IS NOT NULL`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "idx_deals_unclean_dataset"`); + } +} diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 960eee09..b18a1ed7 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -1383,7 +1383,7 @@ describe("DealService", () => { expect(terminateMock).toHaveBeenCalledWith({ dataSetId: 9n }); expect(waitForReceiptMock).toHaveBeenCalledWith({ hash: "0xhash" }); expect(updateFn).toHaveBeenCalledWith( - { dataSetId: 9n, cleanedUp: false }, + expect.objectContaining({ dataSetId: expect.anything(), cleanedUp: false }), expect.objectContaining({ cleanedUp: true, cleanedUpAt: expect.any(Date) }), ); expect(result.dealsAffected).toBe(2); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 2edf4026..e375bb34 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -6,7 +6,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; -import type { Repository } from "typeorm"; +import { In, type Repository } from "typeorm"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { awaitWithAbort } from "../common/abort-utils.js"; import { buildUnixfsCar } from "../common/car-utils.js"; @@ -888,12 +888,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { pdpEndEpoch = await this.waitForPdpEndEpoch(dataSetId, pollTimeoutMs, signal); } - const result = await this.dealRepository.manager.transaction(async (manager) => { - const update = await manager - .getRepository(Deal) - .update({ dataSetId, cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() }); - return update.affected ?? 0; - }); + const result = await this.markDealsCleanedUpForDataSets([dataSetId]); this.logger.log({ event: "dataset_terminated_repaired", @@ -908,6 +903,118 @@ export class DealService implements OnModuleInit, OnModuleDestroy { return { dealsAffected: result, pdpEndEpoch }; } + /** + * Flip `cleaned_up=true` for every Deal row whose `data_set_id` is in + * `dataSetIds` and is still `cleaned_up=false`. Idempotent. Returns the + * total affected row count. + * + * Shared between `repairTerminatedDataSet` (single dataset, post-terminate) + * and `sweepDatasetCleanup` (batched, sweep handler). Internal callers + * should already be holding the FWSS termination evidence; this method + * does no chain check. + */ + async markDealsCleanedUpForDataSets(dataSetIds: bigint[]): Promise { + if (dataSetIds.length === 0) return 0; + return this.dealRepository.manager.transaction(async (manager) => { + const update = await manager + .getRepository(Deal) + .update({ dataSetId: In(dataSetIds), cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() }); + return update.affected ?? 0; + }); + } + + /** + * Scan all uncleaned Deal rows with a non-null `data_set_id`, probe each + * dataset on FWSS, and flip `cleaned_up=true` for any dataset whose + * `pdpEndEpoch != 0n` (terminated) or for which `getDataSet` returns null + * (removed). Returns aggregated counts. + * + * Closes the gap from #546: operator-initiated terminations bypass the + * `repairTerminatedDataSet` path because synapse-sdk's `createContext` + * filters out terminated datasets before the slot lookup classifies them. + */ + async sweepDatasetCleanup(batchSize: number): Promise<{ + datasetsChecked: number; + datasetsTerminated: number; + datasetsDne: number; + datasetsLive: number; + probeErrors: number; + dealsAffected: number; + }> { + const rows = await this.dealRepository + .createQueryBuilder("deal") + .select("DISTINCT deal.data_set_id", "data_set_id") + .where("deal.cleaned_up = false") + .andWhere("deal.data_set_id IS NOT NULL") + .getRawMany<{ data_set_id: string }>(); + const ids = rows.map((r) => BigInt(r.data_set_id)); + if (ids.length === 0) { + return { + datasetsChecked: 0, + datasetsTerminated: 0, + datasetsDne: 0, + datasetsLive: 0, + probeErrors: 0, + dealsAffected: 0, + }; + } + + const { warmStorageService } = this.walletSdkService.getWalletServices(); + const terminated: bigint[] = []; + const dne: bigint[] = []; + let live = 0; + let errors = 0; + + const cap = Math.max(1, batchSize); + for (let i = 0; i < ids.length; i += cap) { + const chunk = ids.slice(i, i + cap); + const settled = await Promise.allSettled( + chunk.map(async (dataSetId) => ({ + dataSetId, + info: await warmStorageService.getDataSet({ dataSetId }), + })), + ); + for (let j = 0; j < settled.length; j++) { + const result = settled[j]; + if (result.status === "rejected") { + errors++; + this.logger.warn({ + event: "dataset_cleanup_sweep_probe_failed", + message: "FWSS getDataSet probe failed; skipping dataset this tick", + dataSetId: chunk[j].toString(), + error: toStructuredError(result.reason), + }); + continue; + } + const { dataSetId, info } = result.value; + if (info == null) { + dne.push(dataSetId); + } else if (info.pdpEndEpoch !== 0n) { + terminated.push(dataSetId); + } else { + live++; + } + } + } + + let dealsAffected = 0; + if (terminated.length > 0) { + dealsAffected += await this.markDealsCleanedUpForDataSets(terminated); + } + if (dne.length > 0) { + dealsAffected += await this.markDealsCleanedUpForDataSets(dne); + } + + return { + datasetsChecked: ids.length, + datasetsTerminated: terminated.length, + datasetsDne: dne.length, + datasetsLive: live, + probeErrors: errors, + dealsAffected, + }; + } + /** * Poll FWSS getDataSet({dataSetId}).pdpEndEpoch until non-zero. Exponential * backoff capped at 8s. Throws on timeout. diff --git a/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts b/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts new file mode 100644 index 00000000..02a10975 --- /dev/null +++ b/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts @@ -0,0 +1,57 @@ +import type { Logger } from "@nestjs/common"; +import { toStructuredError } from "../common/logging.js"; +import type { DealService } from "../deal/deal.service.js"; + +export interface DatasetCleanupSweepDeps { + dealService: Pick; + logger: Logger; + batchSize: number; +} + +/** + * Periodic global job that flips `Deal.cleaned_up=true` for any uncleaned + * Deal row whose `data_set_id` shows `pdpEndEpoch != 0n` on FWSS (terminated) + * or whose `getDataSet` returns null (removed). + * + * Background: in session-key + multisig payer mode, dealbot cannot + * auto-terminate datasets. Operators submit `terminateService` via Safe. + * After the Safe batch lands, synapse-sdk's `createContext` filters out the + * terminated dataset before `getDataSetProvisioningStatus` can classify it + * as `terminated`, so `repairTerminatedDataSet` is never invoked for those + * rows. The retrieval candidate selector keeps picking the stale Deal rows + * and pollutes failure metrics. + * + * This sweeper closes that gap without depending on any chain-side fix. + * See https://github.com/FilOzone/dealbot/issues/546 + */ +export async function runDatasetCleanupSweep(deps: DatasetCleanupSweepDeps): Promise { + const { dealService, logger, batchSize } = deps; + const startedAt = Date.now(); + logger.log({ + event: "dataset_cleanup_sweep_started", + message: "Sweeping uncleaned Deal rows against FWSS state", + batchSize, + }); + try { + const result = await dealService.sweepDatasetCleanup(batchSize); + logger.log({ + event: "dataset_cleanup_sweep_completed", + message: "Dataset cleanup sweep completed", + datasetsChecked: result.datasetsChecked, + datasetsTerminated: result.datasetsTerminated, + datasetsDne: result.datasetsDne, + datasetsLive: result.datasetsLive, + probeErrors: result.probeErrors, + dealsAffected: result.dealsAffected, + durationMs: Date.now() - startedAt, + }); + } catch (error) { + logger.error({ + event: "dataset_cleanup_sweep_failed", + message: "Dataset cleanup sweep failed", + error: toStructuredError(error), + durationMs: Date.now() - startedAt, + }); + throw error; + } +} diff --git a/apps/backend/src/jobs/job-queues.ts b/apps/backend/src/jobs/job-queues.ts index 9488ce7b..e2941006 100644 --- a/apps/backend/src/jobs/job-queues.ts +++ b/apps/backend/src/jobs/job-queues.ts @@ -7,3 +7,4 @@ export const LEGACY_DEAL_QUEUE = "deal.run"; export const LEGACY_RETRIEVAL_QUEUE = "retrieval.run"; export const DATA_RETENTION_POLL_QUEUE = "data.retention.poll"; export const PROVIDERS_REFRESH_QUEUE = "providers.refresh"; +export const DATASET_CLEANUP_SWEEP_QUEUE = "dataset.cleanup.sweep"; diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index f8fe1d80..d373dc95 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -9,7 +9,7 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type JobLogContext, type ProviderJobContext, toStructuredError } from "../common/logging.js"; import { getMaintenanceWindowStatus } from "../common/maintenance-window.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; -import type { IConfig, ISpBlocklistConfig } from "../config/app.config.js"; +import type { IConfig, ISchedulingConfig, ISpBlocklistConfig } from "../config/app.config.js"; import { DataRetentionService } from "../data-retention/data-retention.service.js"; import type { JobType } from "../database/entities/job-schedule-state.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -18,7 +18,13 @@ import { PieceCleanupService } from "../piece-cleanup/piece-cleanup.service.js"; import { RetrievalService } from "../retrieval/retrieval.service.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { provisionNextMissingDataSet } from "./data-set-creation.handler.js"; -import { DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, SP_WORK_QUEUE } from "./job-queues.js"; +import { runDatasetCleanupSweep } from "./dataset-cleanup-sweep.handler.js"; +import { + DATA_RETENTION_POLL_QUEUE, + DATASET_CLEANUP_SWEEP_QUEUE, + PROVIDERS_REFRESH_QUEUE, + SP_WORK_QUEUE, +} from "./job-queues.js"; import { JobScheduleRepository } from "./repositories/job-schedule.repository.js"; type SpJobType = "deal" | "retrieval" | "data_set_creation" | "piece_cleanup"; @@ -31,6 +37,7 @@ type SpJobData = { jobType: SpJobType; spAddress: string; intervalSeconds: numbe type ProvidersRefreshJobData = { intervalSeconds: number }; type SpJob = Job; type DataRetentionJobData = { intervalSeconds: number }; +type DatasetCleanupSweepJobData = { intervalSeconds: number }; type ScheduleRow = { id: number; @@ -258,6 +265,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await boss.createQueue(SP_WORK_QUEUE, { policy: "singleton" }); await boss.createQueue(PROVIDERS_REFRESH_QUEUE); await boss.createQueue(DATA_RETENTION_POLL_QUEUE); + await boss.createQueue(DATASET_CLEANUP_SWEEP_QUEUE); } private registerWorkers(): void { @@ -323,6 +331,20 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { error: toStructuredError(error), }), ); + void this.boss + .work( + DATASET_CLEANUP_SWEEP_QUEUE, + { batchSize: 1, pollingIntervalSeconds: workerPollSeconds }, + async ([job]) => this.handleDatasetCleanupSweepJob(job.data), + ) + .catch((error) => + this.logger.error({ + event: "worker_register_failed", + message: "Failed to register worker", + queue: DATASET_CLEANUP_SWEEP_QUEUE, + error: toStructuredError(error), + }), + ); void this.boss .work(PROVIDERS_REFRESH_QUEUE, { batchSize: 1, pollingIntervalSeconds: workerPollSeconds }, async ([job]) => this.handleProvidersRefreshJob(job.data as ProvidersRefreshJobData), @@ -595,6 +617,19 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } + private async handleDatasetCleanupSweepJob(data: DatasetCleanupSweepJobData): Promise { + void data; + await this.recordJobExecution("dataset_cleanup_sweep", async () => { + const batchSize = this.configService.get("scheduling").datasetCleanupSweepBatchSize; + await runDatasetCleanupSweep({ + dealService: this.dealService, + logger: this.logger, + batchSize, + }); + return "success"; + }); + } + private async handleProvidersRefreshJob(data: ProvidersRefreshJobData): Promise { void data; await this.recordJobExecution("providers_refresh", async () => { @@ -869,6 +904,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds: number; providersRefreshIntervalSeconds: number; pieceCleanupIntervalSeconds: number; + datasetCleanupSweepIntervalSeconds: number; } { const jobsConfig = this.configService.get("jobs", { infer: true }); const scheduling = this.configService.get("scheduling", { infer: true }); @@ -884,6 +920,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const pieceCleanupIntervalSeconds = Math.max(1, Math.round(3600 / pieceCleanupPerHour)); const dataRetentionPollIntervalSeconds = scheduling.dataRetentionPollIntervalSeconds; const providersRefreshIntervalSeconds = scheduling.providersRefreshIntervalSeconds; + const datasetCleanupSweepIntervalSeconds = scheduling.datasetCleanupSweepIntervalSeconds; return { dealIntervalSeconds, @@ -892,6 +929,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, + datasetCleanupSweepIntervalSeconds, }; } @@ -911,6 +949,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, + datasetCleanupSweepIntervalSeconds, } = this.getIntervalSecondsForRates(); const useOnlyApprovedProviders = this.configService.get("blockchain").useOnlyApprovedProviders; @@ -981,6 +1020,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } // Global job schedules (sp_address = '') + const datasetCleanupSweepStartAt = new Date(now.getTime() + phaseMs); await this.jobScheduleRepository.upsertSchedule( "data_retention_poll", "", @@ -993,6 +1033,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { providersRefreshIntervalSeconds, providersRefreshStartAt, ); + await this.jobScheduleRepository.upsertSchedule( + "dataset_cleanup_sweep", + "", + datasetCleanupSweepIntervalSeconds, + datasetCleanupSweepStartAt, + ); } /** @@ -1108,6 +1154,8 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { return DATA_RETENTION_POLL_QUEUE; case "providers_refresh": return PROVIDERS_REFRESH_QUEUE; + case "dataset_cleanup_sweep": + return DATASET_CLEANUP_SWEEP_QUEUE; case "metrics": case "metrics_cleanup": // These legacy job types should be filtered out before reaching this method @@ -1119,7 +1167,9 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } } - private mapJobPayload(row: ScheduleRow): SpJobData | ProvidersRefreshJobData | DataRetentionJobData { + private mapJobPayload( + row: ScheduleRow, + ): SpJobData | ProvidersRefreshJobData | DataRetentionJobData | DatasetCleanupSweepJobData { if ( row.job_type === "deal" || row.job_type === "retrieval" || @@ -1137,7 +1187,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private async safeSend( jobType: JobType, name: string, - data: SpJobData | ProvidersRefreshJobData | DataRetentionJobData, + data: SpJobData | ProvidersRefreshJobData | DataRetentionJobData | DatasetCleanupSweepJobData, options?: SendOptions, ) { if (!this.boss) return false; @@ -1199,6 +1249,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { "piece_cleanup", "data_retention_poll", "providers_refresh", + "dataset_cleanup_sweep", ]; for (const jobType of jobTypes) { this.jobsQueuedGauge.set({ job_type: jobType }, 0);