diff --git a/apps/backend/src/common/constants.ts b/apps/backend/src/common/constants.ts index 57416ae0..0b73afb9 100644 --- a/apps/backend/src/common/constants.ts +++ b/apps/backend/src/common/constants.ts @@ -7,3 +7,6 @@ export const ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"; export const MAX_BLOCK_SIZE = 5 * 1024 * 1024; export const DEV_TAG = stringToHex("dev"); + +// First network will be used as default in absence of NETWORKS config +export const SUPPORTED_NETWORKS = ["calibration", "mainnet"] as const; diff --git a/apps/backend/src/config/app.config.ts b/apps/backend/src/config/app.config.ts index e2d69088..1748a681 100644 --- a/apps/backend/src/config/app.config.ts +++ b/apps/backend/src/config/app.config.ts @@ -45,7 +45,7 @@ export const configValidationSchema = Joi.object({ DATABASE_NAME: Joi.string().required(), // Blockchain - NETWORK: Joi.string().valid("mainnet", "calibration").default("calibration"), + NETWORK: Joi.string().valid("mainnet", "calibration").required(), WALLET_ADDRESS: Joi.string().required(), WALLET_PRIVATE_KEY: Joi.string().optional().empty(""), RPC_URL: Joi.string() @@ -449,7 +449,7 @@ export function loadConfig(): IConfig { database: process.env.DATABASE_NAME || "filecoin_dealbot", }, blockchain: { - network: (process.env.NETWORK || "calibration") as Network, + network: process.env.NETWORK as Network, rpcUrl: process.env.RPC_URL || undefined, sessionKeyPrivateKey: (process.env.SESSION_KEY_PRIVATE_KEY || undefined) as `0x${string}` | undefined, walletAddress: process.env.WALLET_ADDRESS || "0x0000000000000000000000000000000000000000", diff --git a/apps/backend/src/data-retention/data-retention.service.spec.ts b/apps/backend/src/data-retention/data-retention.service.spec.ts index 87ced66a..f2fd991e 100644 --- a/apps/backend/src/data-retention/data-retention.service.spec.ts +++ b/apps/backend/src/data-retention/data-retention.service.spec.ts @@ -69,7 +69,7 @@ describe("DataRetentionService", () => { configServiceMock = { get: vi.fn((key: keyof IConfig) => { if (key === "blockchain") { - return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; } if (key === "spBlocklists") { return { ids: new Set(), addresses: new Set() }; @@ -176,7 +176,7 @@ describe("DataRetentionService", () => { it("returns early when all providers are blocked for data-retention", async () => { (configServiceMock.get as ReturnType).mockImplementation((key: string) => { - if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A, PROVIDER_B]) }; }); @@ -187,7 +187,7 @@ describe("DataRetentionService", () => { it("excludes blocked providers from data-retention polling while retaining unblocked ones", async () => { (configServiceMock.get as ReturnType).mockImplementation((key: string) => { - if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph" }; + if (key === "blockchain") return { pdpSubgraphEndpoint: "https://example.com/subgraph", network: "calibration" }; if (key === "spBlocklists") return { ids: new Set(), addresses: new Set([PROVIDER_A]) }; }); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([makeProvider({ address: PROVIDER_B })]); @@ -195,8 +195,8 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); const allAddressesPolled: string[] = ( - pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [{ addresses: string[] }][] - ).flatMap(([{ addresses }]) => addresses); + pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls as [string, { addresses: string[] }][] + ).flatMap(([, { addresses }]) => addresses); expect(allAddressesPolled).toContain(PROVIDER_B.toLowerCase()); expect(allAddressesPolled).not.toContain(PROVIDER_A.toLowerCase()); }); @@ -215,7 +215,7 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); expect(pdpSubgraphServiceMock.fetchSubgraphMeta).toHaveBeenCalled(); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith({ + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledWith("https://example.com/subgraph", { blockNumber: 1200, addresses: [PROVIDER_A, PROVIDER_B], }); @@ -229,11 +229,12 @@ describe("DataRetentionService", () => { expect(mockBaselineRepository.upsert).toHaveBeenCalledWith( { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1200", }, - ["providerAddress"], + ["providerAddress", "network"], ); }); @@ -282,8 +283,20 @@ describe("DataRetentionService", () => { it("handles multiple providers independently", async () => { // Seed DB baselines so first poll emits deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, - { providerAddress: PROVIDER_B, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, + { + providerAddress: PROVIDER_B, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const providerA = makeProvider({ address: PROVIDER_A, totalFaultedPeriods: 5n }); @@ -306,7 +319,13 @@ describe("DataRetentionService", () => { it("uses subgraph-confirmed totals directly without overdue estimation", async () => { // Seed baseline so we can verify the computed values via deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider(); @@ -343,7 +362,13 @@ describe("DataRetentionService", () => { it("emits both faulted and success counters from subgraph totals", async () => { // Seed baseline so we can verify the computed values via deltas mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider(); @@ -409,7 +434,13 @@ describe("DataRetentionService", () => { // Seed baseline at zero so the full largeValue becomes the delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ @@ -433,7 +464,13 @@ describe("DataRetentionService", () => { // Seed baseline at zero so the full value becomes the delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); pdpSubgraphServiceMock.fetchProvidersWithDatasets.mockResolvedValueOnce([ @@ -449,7 +486,13 @@ describe("DataRetentionService", () => { it("uses only subgraph-confirmed provider-level totals", async () => { // Seed baseline at zero so subgraph totals are visible as delta mockBaselineRepository.find.mockResolvedValueOnce([ - { providerAddress: PROVIDER_A, faultedPeriods: "0", successPeriods: "0", lastBlockNumber: "1000" }, + { + providerAddress: PROVIDER_A, + network: "calibration", + faultedPeriods: "0", + successPeriods: "0", + lastBlockNumber: "1000", + }, ]); const provider = makeProvider({ @@ -481,12 +524,16 @@ describe("DataRetentionService", () => { // Should be called twice: once for first 50, once for remaining 25 expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenCalledTimes(2); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith(1, { - addresses: expect.arrayContaining([expect.any(String)]), - blockNumber: 1200, - }); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][0].addresses).toHaveLength(50); - expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][0].addresses).toHaveLength(25); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets).toHaveBeenNthCalledWith( + 1, + "https://example.com/subgraph", + { + addresses: expect.arrayContaining([expect.any(String)]), + blockNumber: 1200, + }, + ); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[0][1].addresses).toHaveLength(50); + expect(pdpSubgraphServiceMock.fetchProvidersWithDatasets.mock.calls[1][1].addresses).toHaveLength(25); }); it("continues processing next batch if one batch fails", async () => { @@ -562,9 +609,9 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should fetch stale provider info from database + // Should fetch stale provider info from database (network-scoped) expect(mockSPRepository.find).toHaveBeenCalledWith({ - where: { address: expect.anything() }, + where: { address: expect.anything(), network: "calibration" }, select: ["address", "providerId", "name", "isApproved"], }); @@ -803,9 +850,9 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should fetch both stale providers in one query + // Should fetch both stale providers in one query (network-scoped) expect(mockSPRepository.find).toHaveBeenCalledWith({ - where: { address: expect.anything() }, + where: { address: expect.anything(), network: "calibration" }, select: ["address", "providerId", "name", "isApproved"], }); @@ -876,6 +923,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1100", @@ -898,6 +946,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "8", successPeriods: "85", lastBlockNumber: "1000", @@ -997,6 +1046,7 @@ describe("DataRetentionService", () => { mockBaselineRepository.find.mockRejectedValueOnce(new Error("DB connection failed")).mockResolvedValueOnce([ { providerAddress: PROVIDER_A, + network: "calibration", faultedPeriods: "10", successPeriods: "90", lastBlockNumber: "1100", @@ -1060,8 +1110,11 @@ describe("DataRetentionService", () => { await service.pollDataRetention(); - // Should delete the baseline from DB - expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ providerAddress: PROVIDER_A }); + // Should delete the baseline from DB (network-scoped) + expect(mockBaselineRepository.delete).toHaveBeenCalledWith({ + providerAddress: PROVIDER_A, + network: "calibration", + }); }); }); diff --git a/apps/backend/src/data-retention/data-retention.service.ts b/apps/backend/src/data-retention/data-retention.service.ts index c6ece7b5..3b0094c9 100644 --- a/apps/backend/src/data-retention/data-retention.service.ts +++ b/apps/backend/src/data-retention/data-retention.service.ts @@ -7,7 +7,8 @@ import { Raw, Repository } from "typeorm"; import { ClickhouseService } from "../clickhouse/clickhouse.service.js"; import { toStructuredError } from "../common/logging.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; -import { IConfig } from "../config/app.config.js"; +import type { Network } from "../common/types.js"; +import { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { DataRetentionBaseline } from "../database/entities/data-retention-baseline.entity.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import { buildCheckMetricLabels, CheckMetricLabels } from "../metrics-prometheus/check-metric-labels.js"; @@ -59,23 +60,25 @@ export class DataRetentionService { * challenge delta since the last poll. */ async pollDataRetention(): Promise { - const pdpSubgraphEndpoint = this.configService.get("blockchain").pdpSubgraphEndpoint; + const blockchainCfg = this.configService.get("blockchain"); + const { network, pdpSubgraphEndpoint } = blockchainCfg; if (!pdpSubgraphEndpoint) { this.logger.warn({ event: "pdp_subgraph_endpoint_not_configured", message: "No PDP subgraph endpoint configured", + network, }); return; } - const baselines = await this.loadBaselinesFromDb(); + const baselines = await this.loadBaselinesFromDb(network); if (baselines === null) { // Cannot safely compute deltas without persisted baselines. return; } try { - const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(); + const subgraphMeta = await this.pdpSubgraphService.fetchSubgraphMeta(pdpSubgraphEndpoint); const allProviderInfos = this.walletSdkService.getTestingProviders(); const spBlocklists = this.configService.get("spBlocklists"); const providerInfos = allProviderInfos?.filter((p) => !isSpBlocked(spBlocklists, p.serviceProvider, p.id)); @@ -104,7 +107,7 @@ export class DataRetentionService { ); try { - const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets({ + const providersFromSubgraph = await this.pdpSubgraphService.fetchProvidersWithDatasets(pdpSubgraphEndpoint, { blockNumber, addresses: batchAddresses, }); @@ -142,7 +145,12 @@ export class DataRetentionService { } try { - await this.persistBaseline(result.value.providerAddress, result.value.baseline, blockNumberBigInt); + await this.persistBaseline( + result.value.providerAddress, + result.value.baseline, + blockNumberBigInt, + network, + ); } catch (error) { hasProcessingErrors = true; // Leave stale cleanup for a later poll so DB-backed baselines and local state do not diverge further. @@ -180,19 +188,20 @@ export class DataRetentionService { } } - // Only cleanup stale providers after successful poll to preserve baselines during transient failures if (!hasProcessingErrors) { - await this.cleanupStaleProviders(providerAddresses, baselines); + await this.cleanupStaleProviders(providerAddresses, baselines, network); } else { this.logger.warn({ event: "stale_provider_cleanup_skipped", message: "Skipping stale provider cleanup due to processing errors", + network, }); } } catch (error) { this.logger.error({ event: "data_retention_poll_failed", message: "Failed to poll data retention", + network, error: toStructuredError(error), }); } @@ -209,6 +218,7 @@ export class DataRetentionService { private async cleanupStaleProviders( activeProviderAddresses: string[], baselines: Map, + network: Network, ): Promise { const activeAddressSet = new Set(activeProviderAddresses); const staleAddresses: string[] = []; @@ -232,7 +242,10 @@ export class DataRetentionService { let staleProviders: StorageProvider[] = []; try { staleProviders = await this.storageProviderRepository.find({ - where: { address: Raw((alias) => `LOWER(${alias}) IN (:...addresses)`, { addresses: staleAddresses }) }, + where: { + network, + address: Raw((alias) => `LOWER(${alias}) IN (:...addresses)`, { addresses: staleAddresses }), + }, select: ["address", "providerId", "name", "isApproved"], }); } catch (error) { @@ -277,11 +290,12 @@ export class DataRetentionService { baselines.delete(address); // Also remove persisted baseline from DB - this.baselineRepository.delete({ providerAddress: address }).catch((err) => { + this.baselineRepository.delete({ providerAddress: address, network }).catch((err) => { this.logger.warn({ event: "baseline_db_delete_failed", message: "Failed to delete persisted baseline for stale provider", providerAddress: address, + network, error: toStructuredError(err), }); }); @@ -474,9 +488,9 @@ export class DataRetentionService { * deltas from the latest persisted cross-pod baseline. Returns null on DB failure * so the caller can abort the poll. */ - private async loadBaselinesFromDb(): Promise | null> { + private async loadBaselinesFromDb(network: Network): Promise | null> { try { - const rows = await this.baselineRepository.find(); + const rows = await this.baselineRepository.find({ where: { network } }); const baselines = new Map(); for (const row of rows) { baselines.set(row.providerAddress.toLowerCase(), { @@ -487,6 +501,7 @@ export class DataRetentionService { this.logger.log({ event: "baselines_loaded_from_db", message: "Loaded baseline(s) from database", + network, baselineCount: rows.length, }); return baselines; @@ -507,15 +522,17 @@ export class DataRetentionService { providerAddress: string, baseline: ProviderBaseline, blockNumber: bigint, + network: Network, ): Promise { await this.baselineRepository.upsert( { providerAddress, + network, faultedPeriods: baseline.faultedPeriods.toString(), successPeriods: baseline.successPeriods.toString(), lastBlockNumber: blockNumber.toString(), }, - ["providerAddress"], + ["providerAddress", "network"], ); } diff --git a/apps/backend/src/database/entities/data-retention-baseline.entity.ts b/apps/backend/src/database/entities/data-retention-baseline.entity.ts index e3b8169a..365a2a26 100644 --- a/apps/backend/src/database/entities/data-retention-baseline.entity.ts +++ b/apps/backend/src/database/entities/data-retention-baseline.entity.ts @@ -1,10 +1,20 @@ import { Column, Entity, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import type { Network } from "../../common/types.js"; @Entity("data_retention_baselines") export class DataRetentionBaseline { @PrimaryColumn({ name: "provider_address", type: "text" }) providerAddress!: string; + @PrimaryColumn({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) + network!: Network; + @Column({ name: "faulted_periods", type: "bigint" }) faultedPeriods!: string; // bigint stored as string diff --git a/apps/backend/src/database/entities/deal.entity.ts b/apps/backend/src/database/entities/deal.entity.ts index f03a9ea8..1bbf357f 100644 --- a/apps/backend/src/database/entities/deal.entity.ts +++ b/apps/backend/src/database/entities/deal.entity.ts @@ -2,18 +2,22 @@ import { Column, CreateDateColumn, Entity, + Index, JoinColumn, ManyToOne, OneToMany, PrimaryGeneratedColumn, UpdateDateColumn, } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { type DealMetadata, DealStatus, IpniStatus, type ServiceType } from "../types.js"; import type { Retrieval } from "./retrieval.entity.js"; import { StorageProvider } from "./storage-provider.entity.js"; @Entity("deals") +@Index(["network", "spAddress"]) export class Deal { @PrimaryGeneratedColumn("uuid") id: string; @@ -21,6 +25,14 @@ export class Deal { @Column({ name: "sp_address" }) spAddress: string; + @Column({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) + network: Network; + @Column({ name: "wallet_address" }) walletAddress: string; @@ -151,7 +163,10 @@ export class Deal { (sp) => sp.deals, { onDelete: "CASCADE" }, ) - @JoinColumn({ name: "sp_address" }) + @JoinColumn([ + { name: "sp_address", referencedColumnName: "address" }, + { name: "network", referencedColumnName: "network" }, + ]) storageProvider: StorageProvider | null; @OneToMany("Retrieval", "deal") diff --git a/apps/backend/src/database/entities/job-schedule-state.entity.ts b/apps/backend/src/database/entities/job-schedule-state.entity.ts index 4d801d2a..c713a44c 100644 --- a/apps/backend/src/database/entities/job-schedule-state.entity.ts +++ b/apps/backend/src/database/entities/job-schedule-state.entity.ts @@ -1,4 +1,6 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import type { Network } from "../../common/types.js"; export type JobType = | "deal" @@ -11,7 +13,7 @@ export type JobType = | "pull_piece_cleanup"; @Entity("job_schedule_state") -@Index("job_schedule_state_job_type_sp_unique", ["jobType", "spAddress"], { unique: true }) +@Index("job_schedule_state_job_type_sp_network_unique", ["jobType", "spAddress", "network"], { unique: true }) @Index("idx_job_schedule_state_next_run", ["nextRunAt"]) export class JobScheduleState { @PrimaryGeneratedColumn("increment", { type: "bigint" }) @@ -23,6 +25,14 @@ export class JobScheduleState { @Column({ name: "sp_address", type: "text", default: "" }) spAddress!: string; + @Column({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) + network!: Network; + @Column({ name: "interval_seconds" }) intervalSeconds!: number; diff --git a/apps/backend/src/database/entities/storage-provider.entity.ts b/apps/backend/src/database/entities/storage-provider.entity.ts index 3c130094..71f7d256 100644 --- a/apps/backend/src/database/entities/storage-provider.entity.ts +++ b/apps/backend/src/database/entities/storage-provider.entity.ts @@ -1,13 +1,24 @@ import { Column, CreateDateColumn, Entity, Index, OneToMany, PrimaryColumn, UpdateDateColumn } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import type { Network } from "../../common/types.js"; import { BigIntColumn } from "../helpers/bigint-column.js"; import { Deal } from "./deal.entity.js"; @Entity("storage_providers") @Index(["location", "isActive"]) +@Index(["network", "isActive"]) export class StorageProvider { @PrimaryColumn() address!: string; + @PrimaryColumn({ + name: "network", + type: "enum", + enum: [...SUPPORTED_NETWORKS], + enumName: "network_enum", + }) + network!: Network; + @BigIntColumn({ nullable: true }) providerId: bigint | null; diff --git a/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts new file mode 100644 index 00000000..6bfa4fe9 --- /dev/null +++ b/apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts @@ -0,0 +1,257 @@ +import type { MigrationInterface, QueryRunner } from "typeorm"; +import { SUPPORTED_NETWORKS } from "../../common/constants.js"; +import { Network } from "../../common/types.js"; + +/** + * Add a `network` column to runtime tables so records from mainnet and calibration + * are isolated correctly when a single dealbot instance operates on both networks. + * + * Backfill strategy: existing rows are assigned the network declared by the + * operator via `DEALBOT_LEGACY_NETWORK_BACKFILL` (preferred) or the legacy + * `NETWORK` env var. The migration fails fast if neither is set to a supported + * network, so backfill is never silently wrong. Operators later expanding a + * single-network deployment to an additional network must update their + * `NETWORKS` config and re-run a providers_refresh to populate the + * network-scoped rows for the newly added network. + * + * Operator runbook: `docs/runbooks/multi-network-migration.md` covers the + * pre-migration checklist (backup, env vars, stopping writers), running the + * migration, post-migration verification, expanding to a second network, and + * rollback semantics. + */ +export class AddNetworkColumn1776790420000 implements MigrationInterface { + name = "AddNetworkColumn1776790420000"; + + public async up(queryRunner: QueryRunner): Promise { + const backfillNetwork = (process.env.DEALBOT_LEGACY_NETWORK_BACKFILL ?? process.env.NETWORK ?? "").trim(); + if (!SUPPORTED_NETWORKS.includes(backfillNetwork as Network)) { + throw new Error( + `AddNetworkColumn migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) ` + + `to be set to one of: ${SUPPORTED_NETWORKS.join(", ")}. Got: "${backfillNetwork}"`, + ); + } + + // ------------------------------------------------------------------------- + // Create the shared Postgres enum type for network values. All four + // network-bearing tables reference this single type (mirrors TypeORM's + // `enumName: "network_enum"` on the entities), so adding/removing + // supported networks happens in one place. + // ------------------------------------------------------------------------- + await queryRunner.query(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'network_enum') THEN + CREATE TYPE network_enum AS ENUM (${SUPPORTED_NETWORKS.map((n) => `'${n}'`).join(", ")}); + END IF; + END$$; + `); + + // ------------------------------------------------------------------------- + // Add `network` columns first so composite PK/FK can reference them. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE storage_providers + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE deals + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' + `); + + // ------------------------------------------------------------------------- + // Indexes on storage_providers + // ------------------------------------------------------------------------- + await queryRunner.query(` + DROP INDEX IF EXISTS "IDX_storage_providers_region_is_active" + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_location_is_active" + ON storage_providers (location, is_active) + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_network_is_active" + ON storage_providers (network, is_active) + `); + + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_deals_network_sp_address" + ON deals (network, sp_address) + `); + + // ------------------------------------------------------------------------- + // Drop FK before dropping the PK it depends on. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE deals DROP CONSTRAINT IF EXISTS "FK_deals_storage_providers" + `); + + // ------------------------------------------------------------------------- + // storage_providers: change single-column PK to composite (address, network) + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS "PK_4edd0e54ccdb29b54a3ef1e2547" + `); + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey + `); + + await queryRunner.query(` + ALTER TABLE storage_providers ADD PRIMARY KEY (address, network) + `); + + // ------------------------------------------------------------------------- + // Recreate deals FK against the new composite PK. + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE deals + ADD CONSTRAINT "FK_deals_storage_providers" + FOREIGN KEY (sp_address, network) + REFERENCES storage_providers(address, network) + ON DELETE CASCADE + `); + + // ------------------------------------------------------------------------- + // job_schedule_state: replace unique constraint to include network + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE job_schedule_state + DROP CONSTRAINT IF EXISTS job_schedule_state_job_type_sp_unique + `); + + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD CONSTRAINT job_schedule_state_job_type_sp_network_unique + UNIQUE (job_type, sp_address, network) + `); + + // ------------------------------------------------------------------------- + // data_retention_baselines: change single-column PK to composite (provider_address, network) + // ------------------------------------------------------------------------- + await queryRunner.query(` + ALTER TABLE data_retention_baselines + ADD COLUMN IF NOT EXISTS network network_enum NOT NULL DEFAULT '${backfillNetwork}' + `); + + await queryRunner.query(` + ALTER TABLE data_retention_baselines + DROP CONSTRAINT IF EXISTS data_retention_baselines_pkey + `); + + await queryRunner.query(` + ALTER TABLE data_retention_baselines + ADD PRIMARY KEY (provider_address, network) + `); + + // ------------------------------------------------------------------------- + // Drop DEFAULT clauses now that backfill is complete. Future writes must + // set `network` explicitly so a missing app code path fails loudly with a + // NOT NULL violation instead of silently inheriting the migration-time + // default (which is frozen at the legacy backfill network and would + // mislabel rows after the deployment expands to a second network). + // ------------------------------------------------------------------------- + await queryRunner.query(`ALTER TABLE storage_providers ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE deals ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE job_schedule_state ALTER COLUMN network DROP DEFAULT`); + await queryRunner.query(`ALTER TABLE data_retention_baselines ALTER COLUMN network DROP DEFAULT`); + } + + public async down(queryRunner: QueryRunner): Promise { + // Reverting to a single-network schema is destructive when rows for multiple + // networks exist: collapsing storage_providers' PK back to (address) would + // fail on duplicate addresses that live under different networks. The + // operator must declare which network's data to keep; rows belonging to any + // other network are deleted before the schema is collapsed. + const keepNetwork = (process.env.DEALBOT_LEGACY_NETWORK_BACKFILL ?? process.env.NETWORK ?? "").trim(); + if (!SUPPORTED_NETWORKS.includes(keepNetwork as Network)) { + throw new Error( + `AddNetworkColumn.down migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) ` + + `to declare which network's rows to preserve. Got: "${keepNetwork}". Allowed: ${SUPPORTED_NETWORKS.join(", ")}`, + ); + } + + // Delete non-kept-network rows. The composite FK on deals has ON DELETE + // CASCADE, so removing storage_providers rows also removes their deals. + await queryRunner.query(`DELETE FROM data_retention_baselines WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM job_schedule_state WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM deals WHERE network <> $1`, [keepNetwork]); + await queryRunner.query(`DELETE FROM storage_providers WHERE network <> $1`, [keepNetwork]); + + // data_retention_baselines + await queryRunner.query(` + ALTER TABLE data_retention_baselines DROP CONSTRAINT IF EXISTS data_retention_baselines_pkey + `); + await queryRunner.query(` + ALTER TABLE data_retention_baselines ADD PRIMARY KEY (provider_address) + `); + await queryRunner.query(` + ALTER TABLE data_retention_baselines DROP COLUMN IF EXISTS network + `); + + // job_schedule_state + await queryRunner.query(` + ALTER TABLE job_schedule_state + DROP CONSTRAINT IF EXISTS job_schedule_state_job_type_sp_network_unique + `); + await queryRunner.query(` + ALTER TABLE job_schedule_state + ADD CONSTRAINT job_schedule_state_job_type_sp_unique + UNIQUE (job_type, sp_address) + `); + await queryRunner.query(` + ALTER TABLE job_schedule_state DROP COLUMN IF EXISTS network + `); + + // Drop composite FK on deals before altering the PK it depends on. + await queryRunner.query(` + ALTER TABLE deals DROP CONSTRAINT IF EXISTS "FK_deals_storage_providers" + `); + + // Restore single-column PK on storage_providers so (address) is unique again + // before any FK targeting it is recreated. + await queryRunner.query(` + ALTER TABLE storage_providers DROP CONSTRAINT IF EXISTS storage_providers_pkey + `); + await queryRunner.query(` + ALTER TABLE storage_providers ADD PRIMARY KEY (address) + `); + + // Recreate the original single-column FK on deals. + await queryRunner.query(` + ALTER TABLE deals + ADD CONSTRAINT "FK_deals_storage_providers" + FOREIGN KEY (sp_address) + REFERENCES storage_providers(address) + ON DELETE CASCADE + `); + + // Drop indexes that referenced the network column. + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_deals_network_sp_address"`); + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_network_is_active"`); + + // Restore the pre-migration index name on storage_providers. The column is + // still named `location` here; migration 1761500000004 (rename region -> location) + // runs its own down() later in the revert chain and will rename this index's + // underlying column back to `region` transparently. + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_storage_providers_location_is_active"`); + await queryRunner.query(` + CREATE INDEX IF NOT EXISTS "IDX_storage_providers_region_is_active" + ON storage_providers (location, is_active) + `); + + // Finally, drop the network columns now that no constraint or index depends on them. + await queryRunner.query(`ALTER TABLE deals DROP COLUMN IF EXISTS network`); + await queryRunner.query(`ALTER TABLE storage_providers DROP COLUMN IF EXISTS network`); + + // Drop the shared enum type once no column references it. We guard with + // IF EXISTS so a partial revert is idempotent. + await queryRunner.query(`DROP TYPE IF EXISTS network_enum`); + } +} diff --git a/apps/backend/src/deal/deal.service.ts b/apps/backend/src/deal/deal.service.ts index df06ed9c..ac137e98 100644 --- a/apps/backend/src/deal/deal.service.ts +++ b/apps/backend/src/deal/deal.service.ts @@ -312,6 +312,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { deal.fileName = dealInput.processedData.name; deal.fileSize = dealInput.processedData.size; deal.spAddress = providerAddress; + deal.network = this.blockchainConfig.network; deal.status = DealStatus.PENDING; deal.walletAddress = this.blockchainConfig.walletAddress; deal.metadata = dealInput.metadata; @@ -337,7 +338,7 @@ export class DealService implements OnModuleInit, OnModuleDestroy { try { // Load storageProvider relation deal.storageProvider = await this.storageProviderRepository.findOne({ - where: { address: deal.spAddress }, + where: { address: deal.spAddress, network: deal.network }, }); dealLogContext.providerId = deal.storageProvider?.providerId ?? dealLogContext.providerId; providerLabels = buildCheckMetricLabels({ diff --git a/apps/backend/src/dev-tools/dev-tools.service.ts b/apps/backend/src/dev-tools/dev-tools.service.ts index 7a837faf..87522b0f 100644 --- a/apps/backend/src/dev-tools/dev-tools.service.ts +++ b/apps/backend/src/dev-tools/dev-tools.service.ts @@ -1,8 +1,10 @@ import { BadRequestException, Injectable, Logger, NotFoundException } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; import { InjectRepository } from "@nestjs/typeorm"; import type { Repository } from "typeorm"; import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type DealLogContext, toStructuredError } from "../common/logging.js"; +import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { Deal } from "../database/entities/deal.entity.js"; import { DealStatus, RetrievalStatus } from "../database/types.js"; import { DealService } from "../deal/deal.service.js"; @@ -19,6 +21,7 @@ export class DevToolsService { private readonly walletSdkService: WalletSdkService, private readonly dealService: DealService, private readonly retrievalService: RetrievalService, + private readonly configService: ConfigService, @InjectRepository(Deal) private readonly dealRepository: Repository, ) {} @@ -93,6 +96,7 @@ export class DevToolsService { // Create a pending deal record first so we can return the ID immediately const pendingDeal = this.dealRepository.create({ spAddress, + network: this.configService.get("blockchain").network, walletAddress: this.dealService.getWalletAddress(), fileName: "pending", fileSize: 0, diff --git a/apps/backend/src/jobs/jobs.service.spec.ts b/apps/backend/src/jobs/jobs.service.spec.ts index b25cd552..b3fc8741 100644 --- a/apps/backend/src/jobs/jobs.service.spec.ts +++ b/apps/backend/src/jobs/jobs.service.spec.ts @@ -123,7 +123,11 @@ describe("JobsService schedule rows", () => { baseConfigValues = { app: { runMode: "both" } as IConfig["app"], - blockchain: { useOnlyApprovedProviders: false, minNumDataSetsForChecks: 1 } as IConfig["blockchain"], + blockchain: { + useOnlyApprovedProviders: false, + minNumDataSetsForChecks: 1, + network: "calibration", + } as IConfig["blockchain"], scheduling: { providersRefreshIntervalSeconds: 4 * 3600, dataRetentionPollIntervalSeconds: 3600, @@ -316,6 +320,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "deal", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -374,6 +379,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -409,6 +415,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }); @@ -450,6 +457,7 @@ describe("JobsService schedule rows", () => { data: { jobType: "retrieval", spAddress: "0xaaa", + network: "calibration", intervalSeconds: 60, }, }), @@ -656,7 +664,10 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "ensureScheduleRows"); - expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([providerA.address]); + expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith( + [providerA.address], + "calibration", + ); }); it("does not delete schedule rows when no active providers exist", async () => { @@ -670,7 +681,7 @@ describe("JobsService schedule rows", () => { it("uses approved-only filter when configured", async () => { baseConfigValues = { ...baseConfigValues, - blockchain: { useOnlyApprovedProviders: true } as IConfig["blockchain"], + blockchain: { useOnlyApprovedProviders: true, network: "calibration" } as IConfig["blockchain"], }; configService = { get: vi.fn((key: keyof IConfig) => baseConfigValues[key]), @@ -683,7 +694,7 @@ describe("JobsService schedule rows", () => { expect(storageProviderRepositoryMock.find).toHaveBeenCalledWith({ select: { address: true, providerId: true }, - where: { isActive: true, isApproved: true }, + where: { isActive: true, isApproved: true, network: "calibration" }, }); }); @@ -695,12 +706,14 @@ describe("JobsService schedule rows", () => { expect(jobScheduleRepositoryMock.upsertSchedule).toHaveBeenCalledWith( "providers_refresh", "", + "calibration", expect.any(Number), expect.any(Date), ); expect(jobScheduleRepositoryMock.upsertSchedule).toHaveBeenCalledWith( "data_retention_poll", "", + "calibration", expect.any(Number), expect.any(Date), ); @@ -732,6 +745,7 @@ describe("JobsService schedule rows", () => { id: 1, job_type: "deal", sp_address: "0xaaa", + network: "calibration", interval_seconds: 1, next_run_at: "2024-01-01T00:00:00Z", }, @@ -742,8 +756,8 @@ describe("JobsService schedule rows", () => { expect(send).toHaveBeenCalledTimes(3); for (const call of send.mock.calls) { expect(call[0]).toBe("sp.work"); - expect(call[1]).toMatchObject({ jobType: "deal", spAddress: "0xaaa" }); - expect(call[2]).toMatchObject({ singletonKey: "0xaaa", retryLimit: 0 }); + expect(call[1]).toMatchObject({ jobType: "deal", spAddress: "0xaaa", network: "calibration" }); + expect(call[2]).toMatchObject({ singletonKey: "calibration:0xaaa", retryLimit: 0 }); expect(call[2]?.startAfter).toBeUndefined(); } @@ -799,6 +813,7 @@ describe("JobsService schedule rows", () => { id: 11, job_type: "providers_refresh", sp_address: "", + network: "calibration", interval_seconds: 14400, next_run_at: "2024-01-01T00:00:00Z", }, @@ -808,7 +823,7 @@ describe("JobsService schedule rows", () => { expect(send).toHaveBeenCalledTimes(1); expect(send.mock.calls[0][2]).toMatchObject({ - singletonKey: "providers_refresh", + singletonKey: "calibration:providers_refresh", retryLimit: 0, }); }); @@ -881,7 +896,7 @@ describe("JobsService schedule rows", () => { service, "deferJobForMaintenance", "deal", - { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, maintenance, now, ); @@ -890,7 +905,7 @@ describe("JobsService schedule rows", () => { expect(safeSend).toHaveBeenCalledWith( "deal", "sp.work", - { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, { startAfter: expectedResumeAt }, ); }); @@ -920,7 +935,7 @@ describe("JobsService schedule rows", () => { service, "deferJobForMaintenance", "retrieval", - { jobType: "retrieval", spAddress: "0xbbb", intervalSeconds: 60 }, + { jobType: "retrieval", spAddress: "0xbbb", network: "calibration", intervalSeconds: 60 }, maintenance, now, ); @@ -929,7 +944,7 @@ describe("JobsService schedule rows", () => { expect(safeSend).toHaveBeenCalledWith( "retrieval", "sp.work", - { jobType: "retrieval", spAddress: "0xbbb", intervalSeconds: 60 }, + { jobType: "retrieval", spAddress: "0xbbb", network: "calibration", intervalSeconds: 60 }, { startAfter: expectedResumeAt }, ); }); @@ -955,7 +970,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-1", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); @@ -995,7 +1010,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-no-quota-gate", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(pieceCleanupService.cleanupPiecesForProvider).not.toHaveBeenCalled(); @@ -1027,7 +1042,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-deal-terminated", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).toHaveBeenCalledTimes(1); @@ -1056,7 +1071,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-1", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).toHaveBeenCalledTimes(1); @@ -1098,7 +1113,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-2", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).not.toHaveBeenCalled(); @@ -1139,7 +1154,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-3", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); // Only the first missing data set (index 0) should be created @@ -1184,7 +1199,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-ds-3b", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); // Should skip index 0 (exists) and create only index 1 @@ -1331,7 +1346,7 @@ describe("JobsService schedule rows", () => { expect(jobTypes).not.toContain("retrieval"); // Blocked provider is excluded from the active-address list passed to cleanup, // so its existing schedule rows will be deleted. - expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([]); + expect(jobScheduleRepositoryMock.deleteSchedulesForInactiveProviders).toHaveBeenCalledWith([], "calibration"); }); it("deal job is skipped at runtime when provider is blocked", async () => { @@ -1357,7 +1372,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDealJob", { id: "job-blocked-deal", - data: { jobType: "deal", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "deal", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(dealService.createDealForProvider).not.toHaveBeenCalled(); @@ -1381,7 +1396,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleRetrievalJob", { id: "job-blocked-retrieval", - data: { jobType: "retrieval", spAddress: "0xaaa", intervalSeconds: 60 }, + data: { jobType: "retrieval", spAddress: "0xaaa", network: "calibration", intervalSeconds: 60 }, }); expect(retrievalService.performRandomRetrievalForProvider).not.toHaveBeenCalled(); @@ -1410,7 +1425,7 @@ describe("JobsService schedule rows", () => { await callPrivate(service, "handleDataSetCreationJob", { id: "job-blocked-ds", - data: { jobType: "data_set_creation", spAddress: "0xaaa", intervalSeconds: 3600 }, + data: { jobType: "data_set_creation", spAddress: "0xaaa", network: "calibration", intervalSeconds: 3600 }, }); expect(dealService.createDataSetWithPiece).not.toHaveBeenCalled(); @@ -1476,7 +1491,12 @@ describe("JobsService schedule rows", () => { for (const testCase of cases) { await callPrivate(testCase.service, testCase.handler, { id: `job-address-blocked-${testCase.jobType}`, - data: { jobType: testCase.jobType, spAddress: "0xaaa", intervalSeconds: testCase.intervalSeconds }, + data: { + jobType: testCase.jobType, + spAddress: "0xaaa", + network: "calibration", + intervalSeconds: testCase.intervalSeconds, + }, }); testCase.expectCheckNotRun(); diff --git a/apps/backend/src/jobs/jobs.service.ts b/apps/backend/src/jobs/jobs.service.ts index 957ce65a..16c04d5a 100644 --- a/apps/backend/src/jobs/jobs.service.ts +++ b/apps/backend/src/jobs/jobs.service.ts @@ -9,6 +9,7 @@ import { DealJobTerminatedDataSetError } from "../common/errors.js"; import { type JobLogContext, type ProviderJobContext, toStructuredError } from "../common/logging.js"; import { getMaintenanceWindowStatus } from "../common/maintenance-window.js"; import { isSpBlocked } from "../common/sp-blocklist.js"; +import type { Network } from "../common/types.js"; import type { IConfig, ISpBlocklistConfig } from "../config/app.config.js"; import { DataRetentionService } from "../data-retention/data-retention.service.js"; import type { JobType } from "../database/entities/job-schedule-state.entity.js"; @@ -39,16 +40,17 @@ function isSpJobType(jobType: string): jobType is SpJobType { return SP_JOB_TYPES.has(jobType); } -type SpJobData = { jobType: SpJobType; spAddress: string; intervalSeconds: number }; -type ProvidersRefreshJobData = { intervalSeconds: number }; +type SpJobData = { jobType: SpJobType; spAddress: string; network: Network; intervalSeconds: number }; +type ProvidersRefreshJobData = { network: Network; intervalSeconds: number }; type SpJob = Job; -type DataRetentionJobData = { intervalSeconds: number }; -type PullPieceCleanupJobData = { intervalSeconds: number }; +type DataRetentionJobData = { network: Network; intervalSeconds: number }; +type PullPieceCleanupJobData = { network: Network; intervalSeconds: number }; type ScheduleRow = { id: number; job_type: JobType; sp_address: string; + network: Network; interval_seconds: number; next_run_at: string; }; @@ -1066,12 +1068,15 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { pullPieceCleanupIntervalSeconds, } = this.getIntervalSecondsForRates(); - const useOnlyApprovedProviders = this.configService.get("blockchain").useOnlyApprovedProviders; + const blockchainCfg = this.configService.get("blockchain", { infer: true }); + const network = blockchainCfg.network; + + const useOnlyApprovedProviders = blockchainCfg.useOnlyApprovedProviders; // Active providers are guaranteed to support ipniIpfs // as validated by WalletSdkService.loadProvidersInternal() const providers = await this.storageProviderRepository.find({ select: { address: true, providerId: true }, - where: useOnlyApprovedProviders ? { isActive: true, isApproved: true } : { isActive: true }, + where: { network, isActive: true, ...(useOnlyApprovedProviders && { isApproved: true }) }, }); const phaseMs = this.schedulePhaseSeconds() * 1000; @@ -1081,7 +1086,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { const dataRetentionPollStartAt = new Date(now.getTime() + phaseMs); const providersRefreshStartAt = new Date(now.getTime() + phaseMs); - const minDataSets = this.configService.get("blockchain").minNumDataSetsForChecks; + const minDataSets = blockchainCfg.minNumDataSetsForChecks; const cleanupStartAt = new Date(now.getTime() + phaseMs); const pullCheckStartAt = new Date(now.getTime() + phaseMs); @@ -1099,12 +1104,19 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } for (const address of unblockedAddresses) { - await this.jobScheduleRepository.upsertSchedule("deal", address, dealIntervalSeconds, dealStartAt); - await this.jobScheduleRepository.upsertSchedule("retrieval", address, retrievalIntervalSeconds, retrievalStartAt); + await this.jobScheduleRepository.upsertSchedule("deal", address, network, dealIntervalSeconds, dealStartAt); + await this.jobScheduleRepository.upsertSchedule( + "retrieval", + address, + network, + retrievalIntervalSeconds, + retrievalStartAt, + ); if (minDataSets >= 1) { await this.jobScheduleRepository.upsertSchedule( "data_set_creation", address, + network, dataSetCreationIntervalSeconds, dataSetCreationStartAt, ); @@ -1112,19 +1124,24 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "piece_cleanup", address, + network, pieceCleanupIntervalSeconds, cleanupStartAt, ); await this.jobScheduleRepository.upsertSchedule( "pull_check", address, + network, pullCheckIntervalSeconds, pullCheckStartAt, ); } if (providers.length > 0) { - const deletedAddresses = await this.jobScheduleRepository.deleteSchedulesForInactiveProviders(unblockedAddresses); + const deletedAddresses = await this.jobScheduleRepository.deleteSchedulesForInactiveProviders( + unblockedAddresses, + network, + ); if (deletedAddresses.length > 0) { this.logger.warn({ event: "job_schedules_deleted", @@ -1144,18 +1161,21 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { await this.jobScheduleRepository.upsertSchedule( "data_retention_poll", "", + network, dataRetentionPollIntervalSeconds, dataRetentionPollStartAt, ); await this.jobScheduleRepository.upsertSchedule( "providers_refresh", "", + network, providersRefreshIntervalSeconds, providersRefreshStartAt, ); await this.jobScheduleRepository.upsertSchedule( "pull_piece_cleanup", "", + network, pullPieceCleanupIntervalSeconds, new Date(now.getTime() + phaseMs), ); @@ -1189,6 +1209,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { private async enqueueDueJobs(): Promise { if (!this.boss) return; + const network = this.configService.get("blockchain", { infer: true }).network; const now = new Date(); const maintenance = this.getMaintenanceWindowStatus(now); @@ -1199,7 +1220,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { } await this.jobScheduleRepository.runTransaction(async (manager) => { - const rows = await this.jobScheduleRepository.findDueSchedulesWithManager(manager, now); + const rows = await this.jobScheduleRepository.findDueSchedulesWithManager(manager, now, network); for (const row of rows) { const timing = this.getScheduleTiming(row, now); @@ -1276,9 +1297,14 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { row.job_type === "piece_cleanup" || row.job_type === "pull_check" ) { - return { jobType: row.job_type, spAddress: row.sp_address, intervalSeconds: row.interval_seconds }; + return { + jobType: row.job_type, + spAddress: row.sp_address, + network: row.network, + intervalSeconds: row.interval_seconds, + }; } - return { intervalSeconds: row.interval_seconds }; + return { network: row.network, intervalSeconds: row.interval_seconds }; } /** @@ -1297,11 +1323,12 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { if (isSpJobType(jobType)) { const spData = data as SpJobData; if (!finalOptions.singletonKey) { - finalOptions.singletonKey = spData.spAddress; + // Include network in singleton key to prevent cross-network deduplication collisions + finalOptions.singletonKey = `${spData.network}:${spData.spAddress}`; } } else { - // Global jobs: use job type as singleton key. - finalOptions.singletonKey = jobType; + // Global jobs: include network in singleton key for per-network isolation + finalOptions.singletonKey = `${data.network}:${jobType}`; } await this.boss.send(name, data, finalOptions); this.jobsEnqueueAttemptsCounter.inc({ job_type: jobType, outcome: "success" }); @@ -1342,6 +1369,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { * Refreshes queue depth and age gauges from pg-boss tables. */ private async updateQueueMetrics(): Promise { + const network = this.configService.get("blockchain", { infer: true }).network; const jobTypes: JobType[] = [ "deal", "retrieval", @@ -1361,7 +1389,7 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { this.oldestInFlightAgeGauge.set({ job_type: jobType }, 0); } - const rows = await this.jobScheduleRepository.countBossJobStates(["created", "retry", "active"]); + const rows = await this.jobScheduleRepository.countBossJobStates(["created", "retry", "active"], network); if (rows.length > 0) { for (const row of rows) { const jobType = row.job_type as JobType; @@ -1383,20 +1411,20 @@ export class JobsService implements OnModuleInit, OnApplicationShutdown { }); } - const pausedSchedules = await this.jobScheduleRepository.countPausedSchedules(); + const pausedSchedules = await this.jobScheduleRepository.countPausedSchedules(network); for (const row of pausedSchedules) { this.jobsPausedGauge.set({ job_type: row.job_type }, row.count); } const now = new Date(); - const queuedAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("created", now); + const queuedAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("created", now, network); for (const row of queuedAges) { const jobType = row.job_type as JobType; if (!jobTypes.includes(jobType)) continue; this.oldestQueuedAgeGauge.set({ job_type: jobType }, Math.max(0, row.min_age_seconds ?? 0)); } - const activeAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("active", now); + const activeAges = await this.jobScheduleRepository.minBossJobAgeSecondsByState("active", now, network); for (const row of activeAges) { const jobType = row.job_type as JobType; if (!jobTypes.includes(jobType)) continue; diff --git a/apps/backend/src/jobs/repositories/job-schedule.repository.ts b/apps/backend/src/jobs/repositories/job-schedule.repository.ts index 6411a3b1..7d2f2426 100644 --- a/apps/backend/src/jobs/repositories/job-schedule.repository.ts +++ b/apps/backend/src/jobs/repositories/job-schedule.repository.ts @@ -2,6 +2,7 @@ import { Injectable, Logger } from "@nestjs/common"; import { InjectDataSource } from "@nestjs/typeorm"; import type { DataSource } from "typeorm"; import { toStructuredError } from "../../common/logging.js"; +import type { Network } from "../../common/types.js"; import type { JobType } from "../../database/entities/job-schedule-state.entity.js"; import { DATA_RETENTION_POLL_QUEUE, @@ -14,6 +15,7 @@ export type ScheduleRow = { id: number; job_type: JobType; sp_address: string; + network: Network; interval_seconds: number; next_run_at: string; }; @@ -28,25 +30,36 @@ export class JobScheduleRepository { ) {} /** - * Inserts or updates a schedule row for a specific job type and provider. - * If the row exists, it updates the interval and ensures the job is not paused. + * Inserts or updates a schedule row for a specific job type, provider, and + * network. If the row exists, it updates the interval and ensures the job + * is not paused. + * + * Schedules are scoped per network so the same provider running on multiple + * networks (e.g. mainnet + calibration) gets independent cadence rows. * * @param jobType - The type of job (deal, retrieval, metrics, etc.) * @param spAddress - The storage provider address (or empty string for global jobs) + * @param network - The blockchain network this schedule belongs to * @param intervalSeconds - The frequency of the job in seconds * @param nextRunAt - The scheduled time for the next run */ - async upsertSchedule(jobType: JobType, spAddress: string, intervalSeconds: number, nextRunAt: Date): Promise { + async upsertSchedule( + jobType: JobType, + spAddress: string, + network: Network, + intervalSeconds: number, + nextRunAt: Date, + ): Promise { await this.dataSource.query( ` - INSERT INTO job_schedule_state (job_type, sp_address, interval_seconds, next_run_at) - VALUES ($1, $2, $3, $4) - ON CONFLICT (job_type, sp_address) DO UPDATE + INSERT INTO job_schedule_state (job_type, sp_address, network, interval_seconds, next_run_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (job_type, sp_address, network) DO UPDATE SET interval_seconds = EXCLUDED.interval_seconds, paused = job_schedule_state.paused, updated_at = NOW() `, - [jobType, spAddress, intervalSeconds, nextRunAt], + [jobType, spAddress, network, intervalSeconds, nextRunAt], ); } @@ -60,21 +73,24 @@ export class JobScheduleRepository { * @param activeAddresses - List of currently active provider addresses to keep. * @returns Array of storage provider addresses whose schedules were deleted. */ - async deleteSchedulesForInactiveProviders(activeAddresses: string[]): Promise { + async deleteSchedulesForInactiveProviders(activeAddresses: string[], network: Network): Promise { try { if (activeAddresses.length === 0) { this.logger.warn({ event: "delete_all_provider_schedules_warning", message: "Deleting all provider schedules because activeAddresses is empty. Ensure this is intended to avoid mass deletion.", + network, }); const [rows] = (await this.dataSource.query( ` DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' + AND network::text = $1 RETURNING sp_address `, + [network], )) || [[]]; return rows.map((row: { sp_address: string }) => row.sp_address); } @@ -84,10 +100,11 @@ export class JobScheduleRepository { DELETE FROM job_schedule_state WHERE job_type IN ('deal', 'retrieval', 'data_set_creation', 'piece_cleanup', 'pull_check') AND sp_address <> '' - AND sp_address <> ALL($1::text[]) + AND network::text = $1 + AND sp_address <> ALL($2::text[]) RETURNING sp_address `, - [activeAddresses], + [network, activeAddresses], )) || [[]]; return rows.map((row: { sp_address: string }) => row.sp_address); } catch (error) { @@ -95,6 +112,7 @@ export class JobScheduleRepository { event: "delete_inactive_provider_schedules_failed", message: "Failed to delete schedules for inactive providers", error: toStructuredError(error), + network, }); throw error; } @@ -103,14 +121,16 @@ export class JobScheduleRepository { /** * Counts manually paused jobs by type. */ - async countPausedSchedules(): Promise<{ job_type: string; count: number }[]> { + async countPausedSchedules(network?: Network): Promise<{ job_type: string; count: number }[]> { return this.dataSource.query( ` SELECT job_type, COUNT(*)::int AS count FROM job_schedule_state WHERE paused = true + AND ($1::text IS NULL OR network::text = $1) GROUP BY job_type `, + [network ?? null], ); } @@ -124,17 +144,19 @@ export class JobScheduleRepository { async findDueSchedulesWithManager( manager: { query: (sql: string, params?: any[]) => Promise }, now: Date, + network?: Network, ): Promise { return manager.query( ` - SELECT id, job_type, sp_address, interval_seconds, next_run_at + SELECT id, job_type, sp_address, network, interval_seconds, next_run_at FROM job_schedule_state WHERE paused = false AND next_run_at <= $1 + AND ($2::text IS NULL OR network::text = $2) ORDER BY next_run_at ASC FOR UPDATE SKIP LOCKED `, - [now], + [now, network ?? null], ); } @@ -202,7 +224,10 @@ export class JobScheduleRepository { * Uses `data->>'jobType'` for the shared sp.work queue. * Casts state to text so drivers always return a string (pg-boss uses job_state enum). */ - async countBossJobStates(states: string[]): Promise<{ job_type: string; state: string; count: number }[]> { + async countBossJobStates( + states: string[], + network?: Network, + ): Promise<{ job_type: string; state: string; count: number }[]> { return this.dataSource.query( ` SELECT @@ -217,9 +242,17 @@ export class JobScheduleRepository { COUNT(*)::int AS count FROM pgboss.job WHERE state::text = ANY($1::text[]) + AND ($6::text IS NULL OR data->>'network' = $6) GROUP BY 1, 2 `, - [states, SP_WORK_QUEUE, DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, PULL_PIECE_CLEANUP_QUEUE], + [ + states, + SP_WORK_QUEUE, + DATA_RETENTION_POLL_QUEUE, + PROVIDERS_REFRESH_QUEUE, + PULL_PIECE_CLEANUP_QUEUE, + network ?? null, + ], ); } @@ -230,6 +263,7 @@ export class JobScheduleRepository { async minBossJobAgeSecondsByState( state: "created" | "active", now: Date, + network?: Network, ): Promise<{ job_type: string; min_age_seconds: number | null }[]> { return this.dataSource.query( ` @@ -253,9 +287,18 @@ export class JobScheduleRepository { ) AS min_age_seconds FROM pgboss.job WHERE state::text = $2 + AND ($7::text IS NULL OR data->>'network' = $7) GROUP BY 1 `, - [now, state, SP_WORK_QUEUE, DATA_RETENTION_POLL_QUEUE, PROVIDERS_REFRESH_QUEUE, PULL_PIECE_CLEANUP_QUEUE], + [ + now, + state, + SP_WORK_QUEUE, + DATA_RETENTION_POLL_QUEUE, + PROVIDERS_REFRESH_QUEUE, + PULL_PIECE_CLEANUP_QUEUE, + network ?? null, + ], ); } } diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts index cd3a1ea8..7337b3d7 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.spec.ts @@ -1,6 +1,4 @@ -import type { ConfigService } from "@nestjs/config"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { IConfig } from "../config/app.config.js"; import { PDPSubgraphService } from "./pdp-subgraph.service.js"; const VALID_ADDRESS = "0xd8da6bf26964af9d7eed9e03e53415d37aa96045" as const; @@ -40,16 +38,7 @@ describe("PDPSubgraphService", () => { let fetchMock: ReturnType; beforeEach(() => { - const configService = { - get: vi.fn((key: keyof IConfig) => { - if (key === "blockchain") { - return { pdpSubgraphEndpoint: SUBGRAPH_ENDPOINT }; - } - return undefined; - }), - } as unknown as ConfigService; - - service = new PDPSubgraphService(configService); + service = new PDPSubgraphService(); fetchMock = vi.fn(); vi.stubGlobal("fetch", fetchMock); @@ -69,7 +58,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -93,7 +82,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([]), }); - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -101,7 +90,7 @@ describe("PDPSubgraphService", () => { }); it("returns empty array when addresses array is empty", async () => { - const providers = await service.fetchProvidersWithDatasets({ + const providers = await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [], }); @@ -116,7 +105,7 @@ describe("PDPSubgraphService", () => { status: 500, }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -139,7 +128,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -155,7 +144,7 @@ describe("PDPSubgraphService", () => { it("throws on network failure", async () => { fetchMock.mockRejectedValueOnce(new Error("Network error")); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -177,10 +166,7 @@ describe("PDPSubgraphService", () => { }); await expect( - service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses: [VALID_ADDRESS], - }), + service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS] }), ).rejects.toThrow("Data validation failed"); // Should only be called once - no retries for validation errors @@ -196,10 +182,7 @@ describe("PDPSubgraphService", () => { }); await expect( - service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses: [VALID_ADDRESS], - }), + service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS] }), ).rejects.toThrow("Data validation failed"); // Should only be called once - no retries for validation errors @@ -212,10 +195,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - await service.fetchProvidersWithDatasets({ - blockNumber: 12345, - addresses: [VALID_ADDRESS], - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 12345, addresses: [VALID_ADDRESS] }); const body = JSON.parse(fetchMock.mock.calls[0][1].body); expect(body.variables.blockNumber).toBe("12345"); @@ -233,7 +213,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -255,10 +235,7 @@ describe("PDPSubgraphService", () => { }); const addresses = [VALID_ADDRESS, "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B"]; - await service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); const body = JSON.parse(fetchMock.mock.calls[0][1].body); expect(body.variables.addresses).toEqual(addresses); @@ -273,10 +250,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([]), }); - await service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + await service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); // Should make 2 requests expect(fetchMock).toHaveBeenCalledTimes(2); @@ -289,7 +263,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphResponse([makeValidProvider()]), }); - const promise = service.fetchProvidersWithDatasets({ + const promise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses: [VALID_ADDRESS], }); @@ -321,10 +295,7 @@ describe("PDPSubgraphService", () => { }; }); - const fetchPromise = service.fetchProvidersWithDatasets({ - blockNumber: 5000, - addresses, - }); + const fetchPromise = service.fetchProvidersWithDatasets(SUBGRAPH_ENDPOINT, { blockNumber: 5000, addresses }); await vi.runAllTimersAsync(); @@ -343,7 +314,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphMetaResponse(12345), }); - const meta = await service.fetchSubgraphMeta(); + const meta = await service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); expect(fetchMock).toHaveBeenCalledWith(SUBGRAPH_ENDPOINT, { method: "POST", @@ -361,13 +332,9 @@ describe("PDPSubgraphService", () => { }); it("throws when PDP subgraph endpoint is not configured", async () => { - const configService = { - get: vi.fn(() => ({ pdpSubgraphEndpoint: "" })), - } as unknown as ConfigService; - - const serviceWithoutEndpoint = new PDPSubgraphService(configService); + const serviceWithoutEndpoint = new PDPSubgraphService(); - await expect(serviceWithoutEndpoint.fetchSubgraphMeta()).rejects.toThrow("No PDP subgraph endpoint configured"); + await expect(serviceWithoutEndpoint.fetchSubgraphMeta("")).rejects.toThrow("No PDP subgraph endpoint configured"); }); it("throws on HTTP error response", async () => { @@ -377,7 +344,7 @@ describe("PDPSubgraphService", () => { statusText: "Internal Server Error", }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -394,7 +361,7 @@ describe("PDPSubgraphService", () => { }), }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -417,7 +384,7 @@ describe("PDPSubgraphService", () => { }), }); - await expect(service.fetchSubgraphMeta()).rejects.toThrow("Data validation failed"); + await expect(service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)).rejects.toThrow("Data validation failed"); expect(fetchMock).toHaveBeenCalledTimes(1); // Should not retry validation errors }); @@ -435,7 +402,7 @@ describe("PDPSubgraphService", () => { }), }); - await expect(service.fetchSubgraphMeta()).rejects.toThrow("Data validation failed"); + await expect(service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)).rejects.toThrow("Data validation failed"); expect(fetchMock).toHaveBeenCalledTimes(1); }); @@ -445,7 +412,7 @@ describe("PDPSubgraphService", () => { json: async () => makeSubgraphMetaResponse(12345), }); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); await vi.runAllTimersAsync(); @@ -459,7 +426,7 @@ describe("PDPSubgraphService", () => { it("throws after MAX_RETRIES attempts on persistent network errors", async () => { fetchMock.mockRejectedValue(new Error("Network timeout")); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); promise.catch(() => {}); await vi.runAllTimersAsync(); @@ -480,7 +447,7 @@ describe("PDPSubgraphService", () => { const startTime = Date.now(); // Make 5 requests - should all go through immediately - const promises = Array.from({ length: 5 }, () => service.fetchSubgraphMeta()); + const promises = Array.from({ length: 5 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await Promise.all(promises); @@ -499,13 +466,13 @@ describe("PDPSubgraphService", () => { }); // Fill up the rate limit window with 50 requests - const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await Promise.all(initialPromises); fetchMock.mockClear(); // Try to make one more request - should wait for oldest to expire - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); // Advance past the 10 second window + buffer await vi.advanceTimersByTimeAsync(10010); @@ -528,7 +495,7 @@ describe("PDPSubgraphService", () => { }); // Fill 48 slots - const initialPromises = Array.from({ length: 48 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 48 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initialPromises); @@ -556,7 +523,7 @@ describe("PDPSubgraphService", () => { }); // Make 30 requests at t=0 - const batch1 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -564,7 +531,7 @@ describe("PDPSubgraphService", () => { await vi.advanceTimersByTimeAsync(5000); // Make 20 more requests at t=5000 - const batch2 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta()); + const batch2 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch2); @@ -575,7 +542,7 @@ describe("PDPSubgraphService", () => { fetchMock.mockClear(); // Should be able to make 30 more requests immediately - const batch3 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta()); + const batch3 = Array.from({ length: 30 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch3); @@ -589,13 +556,13 @@ describe("PDPSubgraphService", () => { }); // Fill the window - const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const initialPromises = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initialPromises); fetchMock.mockClear(); - const promise = service.fetchSubgraphMeta(); + const promise = service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); // Advance past the window + buffer await vi.advanceTimersByTimeAsync(10010); @@ -611,7 +578,7 @@ describe("PDPSubgraphService", () => { }); // Fill window with 50 requests - const batch1 = Array.from({ length: 50 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 50 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -642,7 +609,7 @@ describe("PDPSubgraphService", () => { }); // Fill 47 slots - const initial = Array.from({ length: 47 }, () => service.fetchSubgraphMeta()); + const initial = Array.from({ length: 47 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(initial); @@ -674,7 +641,7 @@ describe("PDPSubgraphService", () => { }); // Make 20 requests - const batch1 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta()); + const batch1 = Array.from({ length: 20 }, () => service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT)); await vi.runAllTimersAsync(); await Promise.all(batch1); @@ -684,7 +651,7 @@ describe("PDPSubgraphService", () => { fetchMock.mockClear(); // Make another request - should have full window available - await service.fetchSubgraphMeta(); + await service.fetchSubgraphMeta(SUBGRAPH_ENDPOINT); const timestamps = (service as any).requestTimestamps; // Should only have 1 timestamp (the new one), old ones filtered out diff --git a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts index aedd8bce..88407e24 100644 --- a/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts +++ b/apps/backend/src/pdp-subgraph/pdp-subgraph.service.ts @@ -1,7 +1,5 @@ import { Injectable, Logger } from "@nestjs/common"; -import { ConfigService } from "@nestjs/config"; import { toStructuredError } from "../common/logging.js"; -import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { Queries } from "./queries.js"; import type { GraphQLResponse, ProviderDataSetResponse, ProvidersWithDataSetsOptions, SubgraphMeta } from "./types.js"; import { validateProviderDataSetResponse, validateSubgraphMetaResponse } from "./types.js"; @@ -23,7 +21,6 @@ class ValidationError extends Error { @Injectable() export class PDPSubgraphService { private readonly logger: Logger = new Logger(PDPSubgraphService.name); - private readonly blockchainConfig: IBlockchainConfig; private static readonly MAX_PROVIDERS_PER_QUERY = 100; private static readonly MAX_CONCURRENT_REQUESTS = 50; @@ -33,10 +30,6 @@ export class PDPSubgraphService { private requestTimestamps: number[] = []; - constructor(private readonly configService: ConfigService) { - this.blockchainConfig = this.configService.get("blockchain"); - } - /** * Fetch subgraph metadata including the latest indexed block number * @@ -44,15 +37,15 @@ export class PDPSubgraphService { * @returns Subgraph metadata with block number * @throws Error if endpoint is not configured or after MAX_RETRIES attempts */ - async fetchSubgraphMeta(attempt: number = 1): Promise { - if (!this.blockchainConfig.pdpSubgraphEndpoint) { + async fetchSubgraphMeta(endpoint: string, attempt: number = 1): Promise { + if (!endpoint) { throw new Error("No PDP subgraph endpoint configured"); } try { await this.enforceRateLimit(); - const response = await fetch(this.blockchainConfig.pdpSubgraphEndpoint, { + const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", @@ -106,7 +99,7 @@ export class PDPSubgraphService { error: toStructuredError(error), }); await new Promise((resolve) => setTimeout(resolve, delay)); - return this.fetchSubgraphMeta(attempt + 1); + return this.fetchSubgraphMeta(endpoint, attempt + 1); } this.logger.error({ @@ -128,6 +121,7 @@ export class PDPSubgraphService { * @returns Array of providers with their data sets currently proving */ async fetchProvidersWithDatasets( + endpoint: string, options: ProvidersWithDataSetsOptions, ): Promise { const { blockNumber, addresses } = options; @@ -137,16 +131,17 @@ export class PDPSubgraphService { } if (addresses.length <= PDPSubgraphService.MAX_PROVIDERS_PER_QUERY) { - return this.fetchWithRetry(blockNumber, addresses); + return this.fetchWithRetry(endpoint, blockNumber, addresses); } - return this.fetchMultipleBatchesWithRateLimit(blockNumber, addresses); + return this.fetchMultipleBatchesWithRateLimit(endpoint, blockNumber, addresses); } /** * Fetch multiple batches with rate limiting and concurrency control */ private async fetchMultipleBatchesWithRateLimit( + endpoint: string, blockNumber: number, addresses: string[], ): Promise { @@ -161,7 +156,7 @@ export class PDPSubgraphService { for (let i = 0; i < batches.length; i += PDPSubgraphService.MAX_CONCURRENT_REQUESTS) { const batchGroup = batches.slice(i, i + PDPSubgraphService.MAX_CONCURRENT_REQUESTS); - const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(blockNumber, batch))); + const results = await Promise.all(batchGroup.map((batch) => this.fetchWithRetry(endpoint, blockNumber, batch))); allProviders.push(...results.flat()); } @@ -174,11 +169,12 @@ export class PDPSubgraphService { * Assuming initial request to be first attempt */ private async fetchWithRetry( + endpoint: string, blockNumber: number, addresses: string[], attempt: number = 1, ): Promise { - if (!this.blockchainConfig.pdpSubgraphEndpoint) { + if (!endpoint) { throw new Error("No PDP subgraph endpoint configured"); } @@ -190,7 +186,7 @@ export class PDPSubgraphService { try { await this.enforceRateLimit(); - const response = await fetch(this.blockchainConfig.pdpSubgraphEndpoint, { + const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", @@ -247,7 +243,7 @@ export class PDPSubgraphService { error: toStructuredError(error), }); await new Promise((resolve) => setTimeout(resolve, delay)); - return this.fetchWithRetry(blockNumber, addresses, attempt + 1); + return this.fetchWithRetry(endpoint, blockNumber, addresses, attempt + 1); } this.logger.error({ diff --git a/apps/backend/src/piece-cleanup/piece-cleanup.service.ts b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts index 45454347..990793a3 100644 --- a/apps/backend/src/piece-cleanup/piece-cleanup.service.ts +++ b/apps/backend/src/piece-cleanup/piece-cleanup.service.ts @@ -321,10 +321,13 @@ export class PieceCleanupService implements OnModuleInit, OnModuleDestroy { */ async getStoredBytesForProvider(spAddress: string): Promise { const walletAddress = this.blockchainConfig.walletAddress; + const network = this.blockchainConfig.network; + const result = await this.dealRepository .createQueryBuilder("deal") .select("COALESCE(SUM(deal.piece_size), 0)", "totalBytes") .where("deal.sp_address = :spAddress", { spAddress }) + .andWhere("deal.network = :network", { network }) .andWhere("deal.wallet_address = :walletAddress", { walletAddress }) .andWhere("deal.status = :status", { status: DealStatus.DEAL_CREATED }) .andWhere("deal.piece_id IS NOT NULL") @@ -340,10 +343,12 @@ export class PieceCleanupService implements OnModuleInit, OnModuleDestroy { */ async getCleanupCandidates(spAddress: string, limit: number): Promise { const walletAddress = this.blockchainConfig.walletAddress; + const network = this.blockchainConfig.network; return this.dealRepository.find({ where: { spAddress, walletAddress, + network, status: DealStatus.DEAL_CREATED, pieceId: Not(IsNull()), dataSetId: Not(IsNull()), diff --git a/apps/backend/src/providers/providers.controller.spec.ts b/apps/backend/src/providers/providers.controller.spec.ts index 28ec15c4..967cfae0 100644 --- a/apps/backend/src/providers/providers.controller.spec.ts +++ b/apps/backend/src/providers/providers.controller.spec.ts @@ -6,6 +6,7 @@ import { ProvidersService } from "./providers.service.js"; function makeProvider(overrides: Partial = {}): StorageProvider { return { + network: "calibration", address: "f01234", providerId: 99n, name: "Test SP", diff --git a/apps/backend/src/retrieval/retrieval.service.ts b/apps/backend/src/retrieval/retrieval.service.ts index 4de117ae..fbb5879c 100644 --- a/apps/backend/src/retrieval/retrieval.service.ts +++ b/apps/backend/src/retrieval/retrieval.service.ts @@ -503,12 +503,14 @@ export class RetrievalService { * Uses Postgres ORDER BY RANDOM() since Dealbot is Postgres-only. */ private async selectRandomSuccessfulDealForProvider(spAddress: string): Promise { - const walletAddress = this.configService.get("blockchain", { infer: true }).walletAddress; + const { network, walletAddress } = this.configService.get("blockchain", { infer: true }); + const randomDatasetSizes = this.getRandomDatasetSizes(); const query = this.dealRepository .createQueryBuilder("deal") .innerJoin("deal.storageProvider", "sp", "sp.isActive = :isActive", { isActive: true }) .where("deal.sp_address = :spAddress", { spAddress }) + .andWhere("deal.network = :network", { network }) .andWhere("deal.wallet_address = :walletAddress", { walletAddress }) .andWhere("deal.status IN (:...statuses)", { statuses: [DealStatus.DEAL_CREATED], diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts index d6613a31..ed114f53 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.spec.ts @@ -44,6 +44,7 @@ describe("config validation", () => { DATABASE_PASSWORD: "test", DATABASE_NAME: "test", WALLET_ADDRESS: "0x1234567890123456789012345678901234567890", + NETWORK: "calibration", }; it("requires WALLET_PRIVATE_KEY when SESSION_KEY_PRIVATE_KEY is absent", () => { @@ -126,7 +127,7 @@ describe("WalletSdkService", () => { }); const other = makeProvider({ id: 22n, serviceProvider: "0xother" }); - await service.syncProvidersToDatabase([inactive, active, other]); + await service.syncProvidersToDatabase([inactive, active, other], "calibration"); expect(loggerMock.warn).toHaveBeenCalledWith( expect.objectContaining({ @@ -147,11 +148,11 @@ describe("WalletSdkService", () => { expect(loggerMock.error).not.toHaveBeenCalled(); const [entities, options] = repoMock.upsert.mock.calls[0]; - expect(options).toEqual(expect.objectContaining({ conflictPaths: ["address"] })); + expect(options).toEqual(expect.objectContaining({ conflictPaths: ["address", "network"] })); expect(entities).toEqual( expect.arrayContaining([ - expect.objectContaining({ address: "0xdup", providerId: 21n, name: "new" }), - expect.objectContaining({ address: "0xother", providerId: 22n }), + expect.objectContaining({ network: "calibration", address: "0xdup", providerId: 21n, name: "new" }), + expect.objectContaining({ network: "calibration", address: "0xother", providerId: 22n }), ]), ); }); @@ -170,7 +171,7 @@ describe("WalletSdkService", () => { name: "inactive", }); - await service.syncProvidersToDatabase([active, inactive]); + await service.syncProvidersToDatabase([active, inactive], "calibration"); expect(loggerMock.warn).toHaveBeenCalledWith( expect.objectContaining({ @@ -203,7 +204,7 @@ describe("WalletSdkService", () => { name: "second", }); - await service.syncProvidersToDatabase([first, second]); + await service.syncProvidersToDatabase([first, second], "calibration"); expect(loggerMock.error).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts index 2ec9401e..e5d8af45 100644 --- a/apps/backend/src/wallet-sdk/wallet-sdk.service.ts +++ b/apps/backend/src/wallet-sdk/wallet-sdk.service.ts @@ -12,6 +12,7 @@ import { type Hex } from "viem"; import { DEV_TAG } from "../common/constants.js"; import { toStructuredError } from "../common/logging.js"; import { createSynapseFromConfig } from "../common/synapse-factory.js"; +import { Network } from "../common/types.js"; import type { IBlockchainConfig, IConfig } from "../config/app.config.js"; import { StorageProvider } from "../database/entities/storage-provider.entity.js"; import type { PDPProviderEx, WalletServices } from "./wallet-sdk.types.js"; @@ -193,7 +194,7 @@ export class WalletSdkService implements OnModuleInit { }; }); - this.syncProvidersToDatabase(extendedProviders).catch((err) => + this.syncProvidersToDatabase(extendedProviders, this.blockchainConfig.network).catch((err) => this.logger.error({ event: "providers_sync_to_db_failed", message: "Failed to sync providers to DB", @@ -431,7 +432,7 @@ export class WalletSdkService implements OnModuleInit { /** * Create or update provider in database */ - async syncProvidersToDatabase(providerInfos: PDPProviderEx[]): Promise { + async syncProvidersToDatabase(providerInfos: PDPProviderEx[], network: Network): Promise { try { const dedupedProviders = new Map(); const duplicatesByAddress = new Map>(); @@ -507,6 +508,7 @@ export class WalletSdkService implements OnModuleInit { const entities = Array.from(dedupedProviders.values()).map((info) => this.spRepository.create({ + network, address: info.serviceProvider as Hex, providerId: info.id, name: info.name, @@ -521,7 +523,7 @@ export class WalletSdkService implements OnModuleInit { ); await this.spRepository.upsert(entities, { - conflictPaths: ["address"], + conflictPaths: ["address", "network"], skipUpdateIfNoValuesChanged: true, }); } catch (error) { diff --git a/docs/runbooks/multi-network-migration.md b/docs/runbooks/multi-network-migration.md new file mode 100644 index 00000000..f4cde7e0 --- /dev/null +++ b/docs/runbooks/multi-network-migration.md @@ -0,0 +1,131 @@ +# Multi-Network Migration Runbook + +This runbook walks operators through enabling the `network` column added by +`AddNetworkColumn1776790420000` (`apps/backend/src/database/migrations/1776790420000-AddNetworkColumn.ts`). +The migration partitions runtime state (storage providers, deals, job schedules, +data-retention baselines) by blockchain network so a single dealbot instance — +or two cooperating instances — can safely operate on multiple networks +(e.g. `mainnet` and `calibration`) without rows colliding under shared keys. + +> Audience: operators upgrading an existing single-network deployment. Fresh +> deployments only need to set `NETWORK` (or `DEALBOT_LEGACY_NETWORK_BACKFILL`) +> to a supported value before first start; the migration runs automatically. + +## What the migration changes + +- Creates a shared Postgres `network_enum` type (`'calibration'`, `'mainnet'`). +- Adds a `network network_enum NOT NULL` column to: + - `storage_providers` (now part of the composite primary key with `address`) + - `deals` + - `job_schedule_state` (now part of the composite uniqueness with `job_type, sp_address`) + - `data_retention_baselines` (now part of the composite primary key with `provider_address`) +- Recreates the `deals → storage_providers` foreign key as a composite + `(sp_address, network) → (address, network)` reference. +- Replaces the unique `job_schedule_state_job_type_sp_unique` constraint with + `job_schedule_state_job_type_sp_network_unique`. + +The migration **fails fast** if the backfill network is not supplied or is not +in `SUPPORTED_NETWORKS` (see `apps/backend/src/common/constants.ts`). + +## Pre-migration checklist + +1. **Take a database backup.** This is a structural migration affecting four + tables and a foreign key. See `docs/runbooks/supabase-backup-restore.md`. +2. **Identify the network of all existing rows.** Pre-migration, the deployment + has been single-network. Confirm with operations which network's data + currently lives in the database. Allowed values: `calibration`, `mainnet`. +3. **Set `DEALBOT_LEGACY_NETWORK_BACKFILL`** (preferred) or rely on the legacy + `NETWORK` env var so the migration can backfill the new column. + + ```bash + export DEALBOT_LEGACY_NETWORK_BACKFILL=mainnet # or: calibration + ``` + +4. **Stop writers** (or scale to zero) for the duration of the migration so no + rows are inserted with the old default-only `network` column shape. + +## Running the migration + +The migration runs as part of the normal startup sequence +(`migrationsRun: true`). To run it explicitly: + +```bash +pnpm --filter @dealbot/backend run typeorm:migration:run +``` + +If the env var is missing or invalid, startup aborts with: + +``` +AddNetworkColumn migration requires DEALBOT_LEGACY_NETWORK_BACKFILL (or legacy NETWORK) +to be set to one of: calibration, mainnet. Got: "" +``` + +Set the env var and rerun. + +## Post-migration verification + +1. **Confirm the enum type exists**: + + ```sql + SELECT typname FROM pg_type WHERE typname = 'network_enum'; + ``` + +2. **Confirm every row has a network assigned**: + + ```sql + SELECT 'storage_providers' AS tbl, network, COUNT(*) + FROM storage_providers GROUP BY network + UNION ALL + SELECT 'deals', network, COUNT(*) FROM deals GROUP BY network + UNION ALL + SELECT 'job_schedule_state', network, COUNT(*) FROM job_schedule_state GROUP BY network + UNION ALL + SELECT 'data_retention_baselines', network, COUNT(*) FROM data_retention_baselines GROUP BY network; + ``` + + All groups should match the backfill network. + +3. **Restart the backend** and confirm the providers refresh job runs without + errors. The Prometheus `network` label on app metrics should reflect the + configured network. + +## Expanding to a second network + +Once the schema is migrated, adding a second network to a deployment is +purely an application-level change: + +1. Update the deployment configuration to point at the new network's RPC and + contracts (or run a second backend instance dedicated to it). +2. Trigger a `providers_refresh` job. The wallet SDK service writes new + `storage_providers` rows with the active `network` value, so the new + network's providers will not collide with existing rows even if SP + addresses overlap. +3. Job schedules, deals, and data-retention baselines created from this point + onward are automatically scoped to the new network. + +No database changes are required to onboard a new network — only the existing +`network_enum` values are accepted, so adding networks beyond +`SUPPORTED_NETWORKS` requires extending that constant and adding a follow-up +migration that calls `ALTER TYPE network_enum ADD VALUE 'newnet'`. + +## Rolling back + +The down migration is destructive when rows for multiple networks exist. The +operator must declare which network's data to preserve via +`DEALBOT_LEGACY_NETWORK_BACKFILL` (or legacy `NETWORK`); rows from any other +network are deleted before the schema collapses back to single-network shape. + +```bash +export DEALBOT_LEGACY_NETWORK_BACKFILL=mainnet +pnpm --filter @dealbot/backend run typeorm:migration:revert +``` + +After revert: + +- The `network_enum` type is dropped. +- `storage_providers` reverts to a single-column `(address)` primary key. +- The `deals → storage_providers` FK reverts to `(sp_address) → (address)`. +- The `job_schedule_state_job_type_sp_unique` constraint is restored. + +> Always take a fresh backup before reverting — deleted-other-network rows +> are not recoverable from the running database afterwards.