Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ export interface IBlockchainConfig {
export interface ISchedulingConfig {
providersRefreshIntervalSeconds: number;
dataRetentionPollIntervalSeconds: number;
datasetCleanupSweepIntervalSeconds: number;
datasetCleanupSweepBatchSize: number;
maintenanceWindowsUtc: string[];
maintenanceWindowMinutes: number;
}
Expand Down Expand Up @@ -383,6 +385,11 @@ export function loadConfig(): IConfig {
scheduling: {
providersRefreshIntervalSeconds: Number.parseInt(process.env.PROVIDERS_REFRESH_INTERVAL_SECONDS || "14400", 10),
dataRetentionPollIntervalSeconds: Number.parseInt(process.env.DATA_RETENTION_POLL_INTERVAL_SECONDS || "3600", 10),
datasetCleanupSweepIntervalSeconds: Number.parseInt(
process.env.DATASET_CLEANUP_SWEEP_INTERVAL_SECONDS || "86400",
10,
),
datasetCleanupSweepBatchSize: Number.parseInt(process.env.DATASET_CLEANUP_SWEEP_BATCH_SIZE || "50", 10),
maintenanceWindowsUtc: (process.env.DEALBOT_MAINTENANCE_WINDOWS_UTC || "07:00,22:00")
.split(",")
.map((value) => value.trim())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export type JobType =
| "metrics_cleanup" // legacy: no longer scheduled; see RemoveMetricsJobScheduleRows migration. TODO(#457): remove.
| "providers_refresh"
| "data_retention_poll"
| "piece_cleanup";
| "piece_cleanup"
| "dataset_cleanup_sweep";

@Entity("job_schedule_state")
@Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { MigrationInterface, QueryRunner } from "typeorm";

/**
* Partial index supporting the dataset_cleanup_sweep job and the
* piece-cleanup / retrieval candidate selectors that filter on
* `cleaned_up = false`.
*
* Without this index, `SELECT DISTINCT data_set_id WHERE cleaned_up = false`
* triggers a full table scan of `deals` on every sweep tick.
*/
export class AddDealsUncleanedDatasetIndex1779200000000 implements MigrationInterface {
name = "AddDealsUncleanedDatasetIndex1779200000000";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "idx_deals_unclean_dataset"
ON deals (data_set_id)
WHERE cleaned_up = false AND data_set_id IS NOT NULL`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX IF EXISTS "idx_deals_unclean_dataset"`);
}
}
2 changes: 1 addition & 1 deletion apps/backend/src/deal/deal.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ describe("DealService", () => {
expect(terminateMock).toHaveBeenCalledWith({ dataSetId: 9n });
expect(waitForReceiptMock).toHaveBeenCalledWith({ hash: "0xhash" });
expect(updateFn).toHaveBeenCalledWith(
{ dataSetId: 9n, cleanedUp: false },
expect.objectContaining({ dataSetId: expect.anything(), cleanedUp: false }),
expect.objectContaining({ cleanedUp: true, cleanedUpAt: expect.any(Date) }),
);
expect(result.dealsAffected).toBe(2);
Expand Down
121 changes: 114 additions & 7 deletions apps/backend/src/deal/deal.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ConfigService } from "@nestjs/config";
import { InjectRepository } from "@nestjs/typeorm";
import { executeUpload } from "filecoin-pin";
import { CID } from "multiformats/cid";
import type { Repository } from "typeorm";
import { In, type Repository } from "typeorm";
import { ClickhouseService } from "../clickhouse/clickhouse.service.js";
import { awaitWithAbort } from "../common/abort-utils.js";
import { buildUnixfsCar } from "../common/car-utils.js";
Expand Down Expand Up @@ -888,12 +888,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
pdpEndEpoch = await this.waitForPdpEndEpoch(dataSetId, pollTimeoutMs, signal);
}

const result = await this.dealRepository.manager.transaction(async (manager) => {
const update = await manager
.getRepository(Deal)
.update({ dataSetId, cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() });
return update.affected ?? 0;
});
const result = await this.markDealsCleanedUpForDataSets([dataSetId]);

this.logger.log({
event: "dataset_terminated_repaired",
Expand All @@ -908,6 +903,118 @@ export class DealService implements OnModuleInit, OnModuleDestroy {
return { dealsAffected: result, pdpEndEpoch };
}

/**
* Flip `cleaned_up=true` for every Deal row whose `data_set_id` is in
* `dataSetIds` and is still `cleaned_up=false`. Idempotent. Returns the
* total affected row count.
*
* Shared between `repairTerminatedDataSet` (single dataset, post-terminate)
* and `sweepDatasetCleanup` (batched, sweep handler). Internal callers
* should already be holding the FWSS termination evidence; this method
* does no chain check.
*/
async markDealsCleanedUpForDataSets(dataSetIds: bigint[]): Promise<number> {
if (dataSetIds.length === 0) return 0;
return this.dealRepository.manager.transaction(async (manager) => {
const update = await manager
.getRepository(Deal)
.update({ dataSetId: In(dataSetIds), cleanedUp: false }, { cleanedUp: true, cleanedUpAt: new Date() });
return update.affected ?? 0;
});
}

/**
* Scan all uncleaned Deal rows with a non-null `data_set_id`, probe each
* dataset on FWSS, and flip `cleaned_up=true` for any dataset whose
* `pdpEndEpoch != 0n` (terminated) or for which `getDataSet` returns null
* (removed). Returns aggregated counts.
*
* Closes the gap from #546: operator-initiated terminations bypass the
* `repairTerminatedDataSet` path because synapse-sdk's `createContext`
* filters out terminated datasets before the slot lookup classifies them.
*/
async sweepDatasetCleanup(batchSize: number): Promise<{
datasetsChecked: number;
datasetsTerminated: number;
datasetsDne: number;
datasetsLive: number;
probeErrors: number;
dealsAffected: number;
}> {
const rows = await this.dealRepository
.createQueryBuilder("deal")
.select("DISTINCT deal.data_set_id", "data_set_id")
.where("deal.cleaned_up = false")
.andWhere("deal.data_set_id IS NOT NULL")
.getRawMany<{ data_set_id: string }>();
const ids = rows.map((r) => BigInt(r.data_set_id));
if (ids.length === 0) {
return {
datasetsChecked: 0,
datasetsTerminated: 0,
datasetsDne: 0,
datasetsLive: 0,
probeErrors: 0,
dealsAffected: 0,
};
}

const { warmStorageService } = this.walletSdkService.getWalletServices();
const terminated: bigint[] = [];
const dne: bigint[] = [];
let live = 0;
let errors = 0;

const cap = Math.max(1, batchSize);
for (let i = 0; i < ids.length; i += cap) {
const chunk = ids.slice(i, i + cap);
const settled = await Promise.allSettled(
chunk.map(async (dataSetId) => ({
dataSetId,
info: await warmStorageService.getDataSet({ dataSetId }),
})),
);
for (let j = 0; j < settled.length; j++) {
const result = settled[j];
if (result.status === "rejected") {
errors++;
this.logger.warn({
event: "dataset_cleanup_sweep_probe_failed",
message: "FWSS getDataSet probe failed; skipping dataset this tick",
dataSetId: chunk[j].toString(),
error: toStructuredError(result.reason),
});
continue;
}
const { dataSetId, info } = result.value;
if (info == null) {
dne.push(dataSetId);
} else if (info.pdpEndEpoch !== 0n) {
terminated.push(dataSetId);
} else {
live++;
}
}
}

let dealsAffected = 0;
if (terminated.length > 0) {
dealsAffected += await this.markDealsCleanedUpForDataSets(terminated);
}
if (dne.length > 0) {
dealsAffected += await this.markDealsCleanedUpForDataSets(dne);
}

return {
datasetsChecked: ids.length,
datasetsTerminated: terminated.length,
datasetsDne: dne.length,
datasetsLive: live,
probeErrors: errors,
dealsAffected,
};
}

/**
* Poll FWSS getDataSet({dataSetId}).pdpEndEpoch until non-zero. Exponential
* backoff capped at 8s. Throws on timeout.
Expand Down
57 changes: 57 additions & 0 deletions apps/backend/src/jobs/dataset-cleanup-sweep.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { Logger } from "@nestjs/common";
import { toStructuredError } from "../common/logging.js";
import type { DealService } from "../deal/deal.service.js";

export interface DatasetCleanupSweepDeps {
dealService: Pick<DealService, "sweepDatasetCleanup">;
logger: Logger;
batchSize: number;
}

/**
* Periodic global job that flips `Deal.cleaned_up=true` for any uncleaned
* Deal row whose `data_set_id` shows `pdpEndEpoch != 0n` on FWSS (terminated)
* or whose `getDataSet` returns null (removed).
*
* Background: in session-key + multisig payer mode, dealbot cannot
* auto-terminate datasets. Operators submit `terminateService` via Safe.
* After the Safe batch lands, synapse-sdk's `createContext` filters out the
* terminated dataset before `getDataSetProvisioningStatus` can classify it
* as `terminated`, so `repairTerminatedDataSet` is never invoked for those
* rows. The retrieval candidate selector keeps picking the stale Deal rows
* and pollutes failure metrics.
*
* This sweeper closes that gap without depending on any chain-side fix.
* See https://github.com/FilOzone/dealbot/issues/546
*/
export async function runDatasetCleanupSweep(deps: DatasetCleanupSweepDeps): Promise<void> {
const { dealService, logger, batchSize } = deps;
const startedAt = Date.now();
logger.log({
event: "dataset_cleanup_sweep_started",
message: "Sweeping uncleaned Deal rows against FWSS state",
batchSize,
});
try {
const result = await dealService.sweepDatasetCleanup(batchSize);
logger.log({
event: "dataset_cleanup_sweep_completed",
message: "Dataset cleanup sweep completed",
datasetsChecked: result.datasetsChecked,
datasetsTerminated: result.datasetsTerminated,
datasetsDne: result.datasetsDne,
datasetsLive: result.datasetsLive,
probeErrors: result.probeErrors,
dealsAffected: result.dealsAffected,
durationMs: Date.now() - startedAt,
});
} catch (error) {
logger.error({
event: "dataset_cleanup_sweep_failed",
message: "Dataset cleanup sweep failed",
error: toStructuredError(error),
durationMs: Date.now() - startedAt,
});
throw error;
}
}
1 change: 1 addition & 0 deletions apps/backend/src/jobs/job-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export const LEGACY_DEAL_QUEUE = "deal.run";
export const LEGACY_RETRIEVAL_QUEUE = "retrieval.run";
export const DATA_RETENTION_POLL_QUEUE = "data.retention.poll";
export const PROVIDERS_REFRESH_QUEUE = "providers.refresh";
export const DATASET_CLEANUP_SWEEP_QUEUE = "dataset.cleanup.sweep";
Loading
Loading