diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index b3b32a37..51fed825 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -179,6 +179,8 @@ export interface IBlockchainConfig { export interface ISchedulingConfig { providersRefreshIntervalSeconds: number; dataRetentionPollIntervalSeconds: number; + datasetCleanupSweepIntervalSeconds: number; + datasetCleanupSweepBatchSize: number; maintenanceWindowsUtc: string[]; maintenanceWindowMinutes: number; } @@ -383,6 +385,11 @@ export function loadConfig(): IConfig { scheduling: { providersRefreshIntervalSeconds: Number.parseInt(process.env.PROVIDERS_REFRESH_INTERVAL_SECONDS || "14400", 10), dataRetentionPollIntervalSeconds: Number.parseInt(process.env.DATA_RETENTION_POLL_INTERVAL_SECONDS || "3600", 10), + datasetCleanupSweepIntervalSeconds: Number.parseInt( + process.env.DATASET_CLEANUP_SWEEP_INTERVAL_SECONDS || "86400", + 10, + ), + datasetCleanupSweepBatchSize: Number.parseInt(process.env.DATASET_CLEANUP_SWEEP_BATCH_SIZE || "50", 10), maintenanceWindowsUtc: (process.env.DEALBOT_MAINTENANCE_WINDOWS_UTC || "07:00,22:00") .split(",") .map((value) => value.trim()) diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index d1758ae9..7821a0f3 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -11,7 +11,8 @@ export type JobType = | "metrics_cleanup" // legacy: no longer scheduled; see RemoveMetricsJobScheduleRows migration. TODO(#457): remove. | "providers_refresh" | "data_retention_poll" - | "piece_cleanup"; + | "piece_cleanup" + | "dataset_cleanup_sweep"; @Entity("job_schedule_state") @Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true }) diff --git a/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts b/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts new file mode 100644 index 00000000..c2d53e10 --- /dev/null +++ b/apps/backend/src/database/migrations/1779200000000-AddDealsUncleanedDatasetIndex.ts @@ -0,0 +1,25 @@ +import type { MigrationInterface, QueryRunner } from "typeorm"; + +/** + * Partial index supporting the dataset_cleanup_sweep job and the + * piece-cleanup / retrieval candidate selectors that filter on + * `cleaned_up = false`. + * + * Without this index, `SELECT DISTINCT data_set_id WHERE cleaned_up = false` + * triggers a full table scan of `deals` on every sweep tick. + */ +export class AddDealsUncleanedDatasetIndex1779200000000 implements MigrationInterface { + name = "AddDealsUncleanedDatasetIndex1779200000000"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "idx_deals_unclean_dataset" + ON deals (data_set_id) + WHERE cleaned_up = false AND data_set_id IS NOT NULL`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "idx_deals_unclean_dataset"`); + } +} diff --git a/apps/backend/src/deal/deal.service.spec.ts b/apps/backend/src/deal/deal.service.spec.ts index 960eee09..b18a1ed7 100644 --- a/apps/backend/src/deal/deal.service.spec.ts +++ b/apps/backend/src/deal/deal.service.spec.ts @@ -1383,7 +1383,7 @@ describe("DealService", () => { expect(terminateMock).toHaveBeenCalledWith({ dataSetId: 9n }); expect(waitForReceiptMock).toHaveBeenCalledWith({ hash: "0xhash" }); expect(updateFn).toHaveBeenCalledWith( - { dataSetId: 9n, cleanedUp: false }, + expect.objectContaining({ dataSetId: expect.anything(), cleanedUp: false }), expect.objectContaining({ cleanedUp: true, cleanedUpAt: expect.any(Date) }), ); expect(result.dealsAffected).toBe(2); diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index 2edf4026..e375bb34 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -6,7 +6,7 @@ import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import { executeUpload } from "filecoin-pin"; import { CID } from "multiformats/cid"; -import type { Repository } from "typeorm"; +import { In, type Repository } from "typeorm"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { awaitWithAbort } from "../common/abort-utils.js"; import { buildUnixfsCar } from "../common/car-utils.js"; @@ -888,12 +888,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { pdpEndEpoch = await this.waitForPdpEndEpoch(dataSetId, pollTimeoutMs, signal); } - const result = await this.dealRepository.manager.transaction(async (manager) => { - const update = await manager - .getRepository(Deal) - .update({ dataSetId, cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() }); - return update.affected ?? 0; - }); + const result = await this.markDealsCleanedUpForDataSets([dataSetId]); this.logger.log({ event: "dataset_terminated_repaired", @@ -908,6 +903,118 @@ export class DealService implements OnModuleInit, OnModuleDestroy { return { dealsAffected: result, pdpEndEpoch }; } + /** + * Flip `cleaned_up=true` for every Deal row whose `data_set_id` is in + * `dataSetIds` and is still `cleaned_up=false`. Idempotent. Returns the + * total affected row count. + * + * Shared between `repairTerminatedDataSet` (single dataset, post-terminate) + * and `sweepDatasetCleanup` (batched, sweep handler). Internal callers + * should already be holding the FWSS termination evidence; this method + * does no chain check. + */ + async markDealsCleanedUpForDataSets(dataSetIds: bigint[]): Promise { + if (dataSetIds.length === 0) return 0; + return this.dealRepository.manager.transaction(async (manager) => { + const update = await manager + .getRepository(Deal) + .update({ dataSetId: In(dataSetIds), cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() }); + return update.affected ?? 0; + }); + } + + /** + * Scan all uncleaned Deal rows with a non-null `data_set_id`, probe each + * dataset on FWSS, and flip `cleaned_up=true` for any dataset whose + * `pdpEndEpoch != 0n` (terminated) or for which `getDataSet` returns null + * (removed). Returns aggregated counts. + * + * Closes the gap from #546: operator-initiated terminations bypass the + * `repairTerminatedDataSet` path because synapse-sdk's `createContext` + * filters out terminated datasets before the slot lookup classifies them. + */ + async sweepDatasetCleanup(batchSize: number): Promise<{ + datasetsChecked: number; + datasetsTerminated: number; + datasetsDne: number; + datasetsLive: number; + probeErrors: number; + dealsAffected: number; + }> { + const rows = await this.dealRepository + .createQueryBuilder("deal") + .select("DISTINCT deal.data_set_id", "data_set_id") + .where("deal.cleaned_up = false") + .andWhere("deal.data_set_id IS NOT NULL") + .getRawMany<{ data_set_id: string }>(); + const ids = rows.map((r) => BigInt(r.data_set_id)); + if (ids.length === 0) { + return { + datasetsChecked: 0, + datasetsTerminated: 0, + datasetsDne: 0, + datasetsLive: 0, + probeErrors: 0, + dealsAffected: 0, + }; + } + + const { warmStorageService } = this.walletSdkService.getWalletServices(); + const terminated: bigint[] = []; + const dne: bigint[] = []; + let live = 0; + let errors = 0; + + const cap = Math.max(1, batchSize); + for (let i = 0; i < ids.length; i += cap) { + const chunk = ids.slice(i, i + cap); + const settled = await Promise.allSettled( + chunk.map(async (dataSetId) => ({ + dataSetId, + info: await warmStorageService.getDataSet({ dataSetId }), + })), + ); + for (let j = 0; j < settled.length; j++) { + const result = settled[j]; + if (result.status === "rejected") { + errors++; + this.logger.warn({ + event: "dataset_cleanup_sweep_probe_failed", + message: "FWSS getDataSet probe failed; skipping dataset this tick", + dataSetId: chunk[j].toString(), + error: toStructuredError(result.reason), + }); + continue; + } + const { dataSetId, info } = result.value; + if (info == null) { + dne.push(dataSetId); + } else if (info.pdpEndEpoch !== 0n) { + terminated.push(dataSetId); + } else { + live++; + } + } + } + + let dealsAffected = 0; + if (terminated.length > 0) { + dealsAffected += await this.markDealsCleanedUpForDataSets(terminated); + } + if (dne.length > 0) { + dealsAffected += await this.markDealsCleanedUpForDataSets(dne); + } + + return { + datasetsChecked: ids.length, + datasetsTerminated: terminated.length, + datasetsDne: dne.length, + datasetsLive: live, + probeErrors: errors, + dealsAffected, + }; + } + /** * Poll FWSS getDataSet({dataSetId}).pdpEndEpoch until non-zero. Exponential * backoff capped at 8s. Throws on timeout. diff --git a/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts b/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts new file mode 100644 index 00000000..02a10975 --- /dev/null +++ b/apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts @@ -0,0 +1,57 @@ +import type { Logger } from "@nestjs/common"; +import { toStructuredError } from "../common/logging.js"; +import type { DealService } from "../deal/deal.service.js"; + +export interface DatasetCleanupSweepDeps { + dealService: Pick; + logger: Logger; + batchSize: number; +} + +/** + * Periodic global job that flips `Deal.cleaned_up=true` for any uncleaned + * Deal row whose `data_set_id` shows `pdpEndEpoch != 0n` on FWSS (terminated) + * or whose `getDataSet` returns null (removed). + * + * Background: in session-key + multisig payer mode, dealbot cannot + * auto-terminate datasets. Operators submit `terminateService` via Safe. + * After the Safe batch lands, synapse-sdk's `createContext` filters out the + * terminated dataset before `getDataSetProvisioningStatus` can classify it + * as `terminated`, so `repairTerminatedDataSet` is never invoked for those + * rows. The retrieval candidate selector keeps picking the stale Deal rows + * and pollutes failure metrics. + * + * This sweeper closes that gap without depending on any chain-side fix. + * See https://github.com/FilOzone/dealbot/issues/546 + */ +export async function runDatasetCleanupSweep(deps: DatasetCleanupSweepDeps): Promise { + const { dealService, logger, batchSize } = deps; + const startedAt = Date.now(); + logger.log({ + event: "dataset_cleanup_sweep_started", + message: "Sweeping uncleaned Deal rows against FWSS state", + batchSize, + }); + try { + const result = await dealService.sweepDatasetCleanup(batchSize); + logger.log({ + event: "dataset_cleanup_sweep_completed", + message: "Dataset cleanup sweep completed", + datasetsChecked: result.datasetsChecked, + datasetsTerminated: result.datasetsTerminated, + datasetsDne: result.datasetsDne, + datasetsLive: result.datasetsLive, + probeErrors: result.probeErrors, + dealsAffected: result.dealsAffected, + durationMs: Date.now() - startedAt, + }); + } catch (error) { + logger.error({ + event: "dataset_cleanup_sweep_failed", + message: "Dataset cleanup sweep failed", + error: toStructuredError(error), + durationMs: Date.now() - startedAt, + }); + throw error; + } +} diff --git a/apps/backend/src/jobs/job-queues.ts b/apps/backend/src/jobs/job-queues.ts index 9488ce7b..e2941006 100644 --- a/apps/backend/src/jobs/job-queues.ts +++ b/apps/backend/src/jobs/job-queues.ts @@ -7,3 +7,4 @@ export const LEGACY_DEAL_QUEUE = "deal.run"; export const LEGACY_RETRIEVAL_QUEUE = "retrieval.run"; export const DATA_RETENTION_POLL_QUEUE = "data.retention.poll"; export const PROVIDERS_REFRESH_QUEUE = "providers.refresh"; +export const DATASET_CLEANUP_SWEEP_QUEUE = "dataset.cleanup.sweep"; diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index f8fe1d80..d373dc95 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -9,7 +9,7 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type JobLogContext, type ProviderJobContext, toStructuredError } from "../common/logging.js"; import { getMaintenanceWindowStatus } from "../common/maintenance-window.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; -import type { IConfig, ISpBlocklistConfig } from "../config/app.config.js"; +import type { IConfig, ISchedulingConfig, ISpBlocklistConfig } from "../config/app.config.js"; import { DataRetentionService } from "../data-retention/data-retention.service.js"; import type { JobType } from "../database/entities/job-schedule-state.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; @@ -18,7 +18,13 @@ import { PieceCleanupService } from "../piece-cleanup/piece-cleanup.service.js"; import { RetrievalService } from "../retrieval/retrieval.service.js"; import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js"; import { provisionNextMissingDataSet } from "./data-set-creation.handler.js"; -import { DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, SP_WORK_QUEUE } from "./job-queues.js"; +import { runDatasetCleanupSweep } from "./dataset-cleanup-sweep.handler.js"; +import { + DATA_RETENTION_POLL_QUEUE, + DATASET_CLEANUP_SWEEP_QUEUE, + PROVIDERS_REFRESH_QUEUE, + SP_WORK_QUEUE, +} from "./job-queues.js"; import { JobScheduleRepository } from "./repositories/job-schedule.repository.js"; type SpJobType = "deal" | "retrieval" | "data_set_creation" | "piece_cleanup"; @@ -31,6 +37,7 @@ type SpJobData = { jobType: SpJobType; spAddress: string; intervalSeconds: numbe type ProvidersRefreshJobData = { intervalSeconds: number }; type SpJob = Job; type DataRetentionJobData = { intervalSeconds: number }; +type DatasetCleanupSweepJobData = { intervalSeconds: number }; type ScheduleRow = { id: number; @@ -258,6 +265,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await boss.createQueue(SP_WORK_QUEUE, { policy: "singleton" }); await boss.createQueue(PROVIDERS_REFRESH_QUEUE); await boss.createQueue(DATA_RETENTION_POLL_QUEUE); + await boss.createQueue(DATASET_CLEANUP_SWEEP_QUEUE); } private registerWorkers(): void { @@ -323,6 +331,20 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { error: toStructuredError(error), }), ); + void this.boss + .work( + DATASET_CLEANUP_SWEEP_QUEUE, + { batchSize: 1, pollingIntervalSeconds: workerPollSeconds }, + async ([job]) => this.handleDatasetCleanupSweepJob(job.data), + ) + .catch((error) => + this.logger.error({ + event: "worker_register_failed", + message: "Failed to register worker", + queue: DATASET_CLEANUP_SWEEP_QUEUE, + error: toStructuredError(error), + }), + ); void this.boss .work(PROVIDERS_REFRESH_QUEUE, { batchSize: 1, pollingIntervalSeconds: workerPollSeconds }, async ([job]) => this.handleProvidersRefreshJob(job.data as ProvidersRefreshJobData), @@ -595,6 +617,19 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } + private async handleDatasetCleanupSweepJob(data: DatasetCleanupSweepJobData): Promise { + void data; + await this.recordJobExecution("dataset_cleanup_sweep", async () => { + const batchSize = this.configService.get("scheduling").datasetCleanupSweepBatchSize; + await runDatasetCleanupSweep({ + dealService: this.dealService, + logger: this.logger, + batchSize, + }); + return "success"; + }); + } + private async handleProvidersRefreshJob(data: ProvidersRefreshJobData): Promise { void data; await this.recordJobExecution("providers_refresh", async () => { @@ -869,6 +904,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds: number; providersRefreshIntervalSeconds: number; pieceCleanupIntervalSeconds: number; + datasetCleanupSweepIntervalSeconds: number; } { const jobsConfig = this.configService.get("jobs", { infer: true }); const scheduling = this.configService.get("scheduling", { infer: true }); @@ -884,6 +920,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const pieceCleanupIntervalSeconds = Math.max(1, Math.round(3600 / pieceCleanupPerHour)); const dataRetentionPollIntervalSeconds = scheduling.dataRetentionPollIntervalSeconds; const providersRefreshIntervalSeconds = scheduling.providersRefreshIntervalSeconds; + const datasetCleanupSweepIntervalSeconds = scheduling.datasetCleanupSweepIntervalSeconds; return { dealIntervalSeconds, @@ -892,6 +929,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, + datasetCleanupSweepIntervalSeconds, }; } @@ -911,6 +949,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { dataRetentionPollIntervalSeconds, providersRefreshIntervalSeconds, pieceCleanupIntervalSeconds, + datasetCleanupSweepIntervalSeconds, } = this.getIntervalSecondsForRates(); const useOnlyApprovedProviders = this.configService.get("blockchain").useOnlyApprovedProviders; @@ -981,6 +1020,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } // Global job schedules (sp_address = '') + const datasetCleanupSweepStartAt = new Date(now.getTime() + phaseMs); await this.jobScheduleRepository.upsertSchedule( "data_retention_poll", "", @@ -993,6 +1033,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { providersRefreshIntervalSeconds, providersRefreshStartAt, ); + await this.jobScheduleRepository.upsertSchedule( + "dataset_cleanup_sweep", + "", + datasetCleanupSweepIntervalSeconds, + datasetCleanupSweepStartAt, + ); } /** @@ -1108,6 +1154,8 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { return DATA_RETENTION_POLL_QUEUE; case "providers_refresh": return PROVIDERS_REFRESH_QUEUE; + case "dataset_cleanup_sweep": + return DATASET_CLEANUP_SWEEP_QUEUE; case "metrics": case "metrics_cleanup": // These legacy job types should be filtered out before reaching this method @@ -1119,7 +1167,9 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } } - private mapJobPayload(row: ScheduleRow): SpJobData | ProvidersRefreshJobData | DataRetentionJobData { + private mapJobPayload( + row: ScheduleRow, + ): SpJobData | ProvidersRefreshJobData | DataRetentionJobData | DatasetCleanupSweepJobData { if ( row.job_type === "deal" || row.job_type === "retrieval" || @@ -1137,7 +1187,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private async safeSend( jobType: JobType, name: string, - data: SpJobData | ProvidersRefreshJobData | DataRetentionJobData, + data: SpJobData | ProvidersRefreshJobData | DataRetentionJobData | DatasetCleanupSweepJobData, options?: SendOptions, ) { if (!this.boss) return false; @@ -1199,6 +1249,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { "piece_cleanup", "data_retention_poll", "providers_refresh", + "dataset_cleanup_sweep", ]; for (const jobType of jobTypes) { this.jobsQueuedGauge.set({ job_type: jobType }, 0);