Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ DEALBOT_MAINTENANCE_WINDOW_MINUTES=20
DEALS_PER_SP_PER_HOUR=2
DATASET_CREATIONS_PER_SP_PER_HOUR=1
RETRIEVALS_PER_SP_PER_HOUR=1
# data_set_lifecycle_check canary: creates a throwaway data set and terminates it each tick
# (defaults: enabled on calibration, disabled on mainnet).
# DATASET_LIFECYCLE_CHECK_ENABLED=true
DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR=1
DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS=600 # 10m: create + upload + terminate + pdpEndEpoch poll
Comment thread
silent-cipher marked this conversation as resolved.
PG_BOSS_LOCAL_CONCURRENCY=20
JOB_SCHEDULER_POLL_SECONDS=300
JOB_WORKER_POLL_SECONDS=60
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ export const ZERO_ADDRESS = "0x0000000000000000000000000000000000000000";
export const MAX_BLOCK_SIZE = 5 * 1024 * 1024;

export const DEV_TAG = stringToHex("dev");

/**
* Fixed metadata marker key tagging every throwaway data set created by the
* `data_set_lifecycle_check` job. The value is a per-run nonce; the key is the stable
* handle operators use to list/sweep leaked sets (create-OK / terminate-failed runs).
*/
export const LIFECYCLE_CHECK_METADATA_KEY = "dealbotLifecycleCheck";
39 changes: 39 additions & 0 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ export const configValidationSchema = Joi.object({
// Per-hour limits are guardrails to avoid excessive background load.
DEALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(4),
DATASET_CREATIONS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1),
DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(1),
RETRIEVALS_PER_SP_PER_HOUR: Joi.number().min(0.001).max(20).default(2),
// Enables the data_set_lifecycle_check canary job. The network-dependent default (true on
// calibration, false on mainnet) is resolved in loadConfig; here we only validate the
// type when explicitly set. See docs/checks/data-set-lifecycle-check.md.
DATASET_LIFECYCLE_CHECK_ENABLED: Joi.boolean().optional(),
// Polling interval for pg-boss scheduler (lower = more responsive, higher = less DB chatter).
JOB_SCHEDULER_POLL_SECONDS: Joi.number().min(60).default(300),
JOB_WORKER_POLL_SECONDS: Joi.number().min(5).default(60),
Expand All @@ -93,6 +98,7 @@ export const configValidationSchema = Joi.object({
DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes)
RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds)
DATA_SET_CREATION_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(300), // 5 minutes max runtime for dataset creation jobs
DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(600), // 10 minutes: covers create + seed-piece upload + terminate + pdpEndEpoch poll
// Seconds to hold the process alive after pg-boss drain completes, so Prometheus
// captures at least one scrape of the terminal counter increments emitted during
// shutdown. Default 35 covers the 30s ServiceMonitor interval plus a 5s buffer.
Expand Down Expand Up @@ -226,6 +232,17 @@ export interface IJobsConfig {
* Target number of dataset creation runs per storage provider per hour.
*/
dataSetCreationsPerSpPerHour: number;
/**
* Enables the `data_set_lifecycle_check` canary job, which creates a
* throwaway data set and immediately terminates it in a single tick.
*
* Defaults to true on calibration and false on mainnet.
*/
dataSetLifecycleCheckEnabled: boolean;
/**
* Target number of dataset lifecycle check runs per storage provider per hour.
*/
dataSetLifecycleChecksPerSpPerHour: number;
/**
* How often the scheduler polls Postgres for due jobs (seconds).
*
Expand Down Expand Up @@ -284,6 +301,13 @@ export interface IJobsConfig {
* Uses AbortController to actively cancel job execution.
*/
dataSetCreationJobTimeoutSeconds: number;
/**
* Maximum runtime (seconds) for data-set lifecycle check jobs before forced abort.
*
* Bounds the create-with-seed-piece upload, the terminateService call, and the
* `pdpEndEpoch != 0` confirmation poll. Uses AbortController to actively cancel execution.
*/
dataSetLifecycleCheckJobTimeoutSeconds: number;
/**
* Maximum runtime (seconds) for retrieval jobs before forced abort.
*
Expand Down Expand Up @@ -473,6 +497,17 @@ export function loadConfig(): IConfig {
dealsPerSpPerHour: Number.parseFloat(process.env.DEALS_PER_SP_PER_HOUR || "4"),
retrievalsPerSpPerHour: Number.parseFloat(process.env.RETRIEVALS_PER_SP_PER_HOUR || "2"),
dataSetCreationsPerSpPerHour: Number.parseFloat(process.env.DATASET_CREATIONS_PER_SP_PER_HOUR || "1"),
dataSetLifecycleCheckEnabled: (() => {
const raw = process.env.DATASET_LIFECYCLE_CHECK_ENABLED;
if (raw == null || raw.trim().length === 0) {
// Default: enabled on calibration, disabled on mainnet.
return (process.env.NETWORK || "calibration") === "calibration";
}
return raw === "true";
})(),
dataSetLifecycleChecksPerSpPerHour: Number.parseFloat(
process.env.DATASET_LIFECYCLE_CHECKS_PER_SP_PER_HOUR || "1",
),
schedulerPollSeconds: Number.parseInt(process.env.JOB_SCHEDULER_POLL_SECONDS || "300", 10),
workerPollSeconds: Number.parseInt(process.env.JOB_WORKER_POLL_SECONDS || "60", 10),
pgbossLocalConcurrency: Number.parseInt(process.env.PG_BOSS_LOCAL_CONCURRENCY || "20", 10),
Expand All @@ -484,6 +519,10 @@ export function loadConfig(): IConfig {
dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
dataSetCreationJobTimeoutSeconds: Number.parseInt(process.env.DATA_SET_CREATION_JOB_TIMEOUT_SECONDS || "300", 10),
dataSetLifecycleCheckJobTimeoutSeconds: Number.parseInt(
process.env.DATA_SET_LIFECYCLE_CHECK_JOB_TIMEOUT_SECONDS || "600",
10,
),
shutdownFinalScrapeDelaySeconds: Number.parseInt(process.env.SHUTDOWN_FINAL_SCRAPE_DELAY_SECONDS || "35", 10),
pieceCleanupPerSpPerHour: Number.parseFloat(process.env.JOB_PIECE_CLEANUP_PER_SP_PER_HOUR || String(1 / 24)),
maxPieceCleanupRuntimeSeconds: Number.parseInt(process.env.MAX_PIECE_CLEANUP_RUNTIME_SECONDS || "300", 10),
Expand Down
11 changes: 11 additions & 0 deletions apps/backend/src/data-set-lifecycle/data-set-lifecycle.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Module } from "@nestjs/common";
import { MetricsPrometheusModule } from "../metrics-prometheus/metrics-prometheus.module.js";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
import { DataSetLifecycleService } from "./data-set-lifecycle.service.js";

@Module({
imports: [WalletSdkModule, MetricsPrometheusModule],
providers: [DataSetLifecycleService],
exports: [DataSetLifecycleService],
})
export class DataSetLifecycleModule {}
131 changes: 131 additions & 0 deletions apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { DataSetLifecycleCheckMetrics } from "../metrics-prometheus/check-metrics.service.js";
import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";
import { DataSetLifecycleService } from "./data-set-lifecycle.service.js";

vi.mock("@filoz/synapse-core/sp", () => ({
createDataSet: vi.fn(),
waitForCreateDataSet: vi.fn(),
}));

vi.mock("@filoz/synapse-core/warm-storage", () => ({
terminateServiceSync: vi.fn(),
}));

const { createDataSet, waitForCreateDataSet } = await import("@filoz/synapse-core/sp");
const { terminateServiceSync } = await import("@filoz/synapse-core/warm-storage");

const mockClient = { account: { address: "0xwallet" } };

const mockProviderInfo = {
id: 1n,
name: "test-sp",
isApproved: true,
serviceProvider: "0xsp" as `0x${string}`,
payee: "0xpayee" as `0x${string}`,
pdp: { serviceURL: "https://sp.example.com" },
};

const mockWalletSdkService = {
getProviderInfo: vi.fn(() => mockProviderInfo),
getSynapseClient: vi.fn(() => mockClient),
} as unknown as WalletSdkService;

const mockMetrics = {
observeCheckDuration: vi.fn(),
recordStatus: vi.fn(),
} as unknown as DataSetLifecycleCheckMetrics;

describe("DataSetLifecycleService", () => {
let service: DataSetLifecycleService;

beforeEach(() => {
vi.clearAllMocks();
service = new DataSetLifecycleService(mockWalletSdkService, mockMetrics);
});

it("creates an empty data set, waits for confirmation, terminates it, and records success", async () => {
vi.mocked(createDataSet).mockResolvedValue({ txHash: "0xhash1", statusUrl: "https://sp.example.com/status/1" });
vi.mocked(waitForCreateDataSet).mockResolvedValue({
dataSetId: 42n,
dataSetCreated: true,
txStatus: "confirmed",
ok: true,
createMessageHash: "0xmsg",
service: "https://sp.example.com",
});
vi.mocked(terminateServiceSync).mockResolvedValue({ receipt: {} as any, event: {} as any });

await service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-123" });

expect(createDataSet).toHaveBeenCalledWith(
mockClient,
expect.objectContaining({
cdn: false,
payee: "0xpayee",
serviceURL: "https://sp.example.com",
metadata: { dealbotLifecycleCheck: "nonce-123" },
}),
);
expect(waitForCreateDataSet).toHaveBeenCalledWith(
expect.objectContaining({ statusUrl: "https://sp.example.com/status/1" }),
);
expect(terminateServiceSync).toHaveBeenCalledWith(mockClient, expect.objectContaining({ dataSetId: 42n }));
expect(mockMetrics.observeCheckDuration).toHaveBeenCalledOnce();
expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "success");
});

it("records failure.timedout when signal is aborted before creation", async () => {
const controller = new AbortController();
controller.abort(new Error("job timeout"));

await expect(
service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-456" }, controller.signal),
).rejects.toThrow();

expect(createDataSet).not.toHaveBeenCalled();
expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.timedout");
});

it("records failure.other when creation rejects with a non-abort error", async () => {
vi.mocked(createDataSet).mockRejectedValue(new Error("SP unreachable"));

await expect(service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-789" })).rejects.toThrow(
"SP unreachable",
);

expect(terminateServiceSync).not.toHaveBeenCalled();
expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other");
});

it("records failure.other when termination fails after creation, logging the dataSetId as leaked", async () => {
vi.mocked(createDataSet).mockResolvedValue({ txHash: "0xhash2", statusUrl: "https://sp.example.com/status/2" });
vi.mocked(waitForCreateDataSet).mockResolvedValue({
dataSetId: 99n,
dataSetCreated: true,
txStatus: "confirmed",
ok: true,
createMessageHash: "0xmsg2",
service: "https://sp.example.com",
});
vi.mocked(terminateServiceSync).mockRejectedValue(new Error("terminate failed"));

await expect(service.runLifecycleCheck("0xsp", { dealbotLifecycleCheck: "nonce-999" })).rejects.toThrow(
"terminate failed",
);

expect(mockMetrics.recordStatus).toHaveBeenCalledWith(expect.any(Object), "failure.other");
});

it("throws when provider is not found in registry", async () => {
vi.mocked(mockWalletSdkService.getProviderInfo).mockReturnValueOnce(undefined);

await expect(service.runLifecycleCheck("0xunknown", {})).rejects.toThrow("not found in registry");
});

it("throws when synapse client is not initialized", async () => {
vi.mocked(mockWalletSdkService.getSynapseClient).mockReturnValueOnce(null);

await expect(service.runLifecycleCheck("0xsp", {})).rejects.toThrow("not initialized");
});
});
149 changes: 149 additions & 0 deletions apps/backend/src/data-set-lifecycle/data-set-lifecycle.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import { createDataSet, waitForCreateDataSet } from "@filoz/synapse-core/sp";
import { terminateServiceSync } from "@filoz/synapse-core/warm-storage";
import { Injectable, Logger } from "@nestjs/common";
import { awaitWithAbort } from "../common/abort-utils.js";
import { toStructuredError } from "../common/logging.js";
import { buildCheckMetricLabels, classifyFailureStatus } from "../metrics-prometheus/check-metric-labels.js";
import { DataSetLifecycleCheckMetrics } from "../metrics-prometheus/check-metrics.service.js";
import { WalletSdkService } from "../wallet-sdk/wallet-sdk.service.js";

@Injectable()
export class DataSetLifecycleService {
private readonly logger = new Logger(DataSetLifecycleService.name);

constructor(
private readonly walletSdkService: WalletSdkService,
private readonly lifecycleCheckMetrics: DataSetLifecycleCheckMetrics,
) {}

/**
* Run one data-set lifecycle check: create an empty throwaway data set on the SP,
* wait for on-chain confirmation, then immediately terminate it. Used by the
* `data_set_lifecycle_check` canary job to validate that an SP honours the full
* create → terminate lifecycle.
*
* Never touches managed check data sets and creates no Deal rows. The throwaway set
* is identified by the fixed `dealbotLifecycleCheck` marker key in `metadata`; a
* per-run nonce value prevents accidentally reusing a prior leaked set. If creation
* succeeds but termination fails the set leaks (accepted trade-off); operators can
* sweep leaks by that key.
*
* Emits only `dataSetLifecycleCheckStatus` / `dataSetLifecycleCheckMs` metrics.
*/
async runLifecycleCheck(spAddress: string, metadata: Record<string, string>, signal?: AbortSignal): Promise<void> {
const providerInfo = this.walletSdkService.getProviderInfo(spAddress);
if (!providerInfo) {
throw new Error(`Provider ${spAddress} not found in registry`);
}

const client = this.walletSdkService.getSynapseClient();
if (!client) {
throw new Error("Synapse client not initialized");
}

const labels = buildCheckMetricLabels({
checkType: "dataSetLifecycleCheck",
providerId: providerInfo.id,
providerName: providerInfo.name,
providerIsApproved: providerInfo.isApproved,
});

const logContext = {
providerAddress: spAddress,
providerName: providerInfo.name,
providerId: providerInfo.id,
};

const startedAt = Date.now();
this.logger.log({
event: "dataset_lifecycle_check_started",
message: "Starting data-set lifecycle check",
...logContext,
});

let dataSetId: bigint | undefined;
try {
signal?.throwIfAborted();

// 1. Request creation of an empty data set on the SP.
const createResult = await awaitWithAbort(
createDataSet(client, {
cdn: false,
payee: providerInfo.payee,
serviceURL: providerInfo.pdp.serviceURL,
metadata,
}),
signal,
);
signal?.throwIfAborted();

this.logger.log({
event: "dataset_lifecycle_check_creating",
message: "Empty data set creation submitted; waiting for SP confirmation",
...logContext,
txHash: createResult.txHash,
});

// 2. Wait for the SP to confirm the data set is created and extract the dataSetId.
const confirmed = await awaitWithAbort(waitForCreateDataSet({ statusUrl: createResult.statusUrl }), signal);
dataSetId = confirmed.dataSetId;
signal?.throwIfAborted();

this.logger.log({
event: "dataset_lifecycle_check_created",
message: "Empty data set created and confirmed on-chain",
...logContext,
dataSetId: dataSetId.toString(),
});

// 3. Immediately terminate the throwaway data set.
await awaitWithAbort(
terminateServiceSync(client, {
dataSetId,
onHash: (hash) => {
this.logger.log({
event: "dataset_lifecycle_check_terminating",
message: "Terminate transaction submitted",
...logContext,
dataSetId: (dataSetId as bigint).toString(),
txHash: hash,
});
},
}),
signal,
);

const durationMs = Date.now() - startedAt;
this.lifecycleCheckMetrics.observeCheckDuration(labels, durationMs);
this.lifecycleCheckMetrics.recordStatus(labels, "success");

this.logger.log({
event: "dataset_lifecycle_check_succeeded",
message: "Data-set lifecycle check completed: created and terminated throwaway data set",
...logContext,
dataSetId: dataSetId.toString(),
durationMs,
});
} catch (error) {
const durationMs = Date.now() - startedAt;
const status = signal?.aborted ? "failure.timedout" : classifyFailureStatus(error);
if (status === "failure.timedout") {
this.lifecycleCheckMetrics.observeCheckDuration(labels, durationMs);
}
this.lifecycleCheckMetrics.recordStatus(labels, status);
this.logger.error({
event: "dataset_lifecycle_check_failed",
message:
dataSetId === undefined
? "Data-set lifecycle check failed during creation"
: "Data-set lifecycle check failed during termination; throwaway data set may have leaked",
...logContext,
dataSetId: dataSetId?.toString(),
durationMs,
status,
error: toStructuredError(error),
});
throw error;
}
}
}
Loading